Merge remote-tracking branch 'upstream/develop'

This commit is contained in:
Romain Dorgueil
2018-01-08 07:41:16 +01:00
231 changed files with 7133 additions and 3877 deletions

4
.gitignore vendored
View File

@ -11,6 +11,7 @@
*.so *.so
*.spec *.spec
.*.sw? .*.sw?
.DS_Store
.Python .Python
.cache .cache
.coverage .coverage
@ -22,9 +23,10 @@
.ipynb_checkpoints .ipynb_checkpoints
.python-version .python-version
/.idea /.idea
/.medikit-pipeline
/.release /.release
/bonobo/contrib/jupyter/js/node_modules/
/bonobo/examples/work_in_progress/ /bonobo/examples/work_in_progress/
/bonobo/ext/jupyter/js/node_modules/
/build/ /build/
/coverage.xml /coverage.xml
/dist/ /dist/

View File

@ -1,4 +1,11 @@
[style] [style]
based_on_style = pep8 based_on_style = pep8
column_limit = 120 column_limit = 120
allow_multiline_lambdas = false
allow_multiline_dictionary_keys = false
coalesce_brackets = true
dedent_closing_brackets = true dedent_closing_brackets = true
join_multiple_lines = true
spaces_before_comment = 2
split_before_first_argument = true
split_complex_comprehension = true

View File

@ -4,7 +4,8 @@ python:
- 3.5-dev - 3.5-dev
- 3.6 - 3.6
- 3.6-dev - 3.6-dev
- nightly # - 3.7-dev
# - nightly
install: install:
- make install-dev - make install-dev
- pip install coveralls - pip install coveralls

View File

@ -1 +1,2 @@
include *.txt include *.txt
recursive-include bonobo *.py-tpl

View File

@ -1,4 +1,4 @@
# Generated by Medikit 0.4a5 on 2017-10-30. # Generated by Medikit 0.4.5 on 2018-01-01.
# All changes will be overriden. # All changes will be overriden.
PACKAGE ?= bonobo PACKAGE ?= bonobo
@ -10,6 +10,7 @@ PYTHON_REQUIREMENTS_DEV_FILE ?= requirements-dev.txt
QUICK ?= QUICK ?=
PIP ?= $(PYTHON_DIRNAME)/pip PIP ?= $(PYTHON_DIRNAME)/pip
PIP_INSTALL_OPTIONS ?= PIP_INSTALL_OPTIONS ?=
VERSION ?= $(shell git describe 2>/dev/null || git rev-parse --short HEAD)
PYTEST ?= $(PYTHON_DIRNAME)/pytest PYTEST ?= $(PYTHON_DIRNAME)/pytest
PYTEST_OPTIONS ?= --capture=no --cov=$(PACKAGE) --cov-report html PYTEST_OPTIONS ?= --capture=no --cov=$(PACKAGE) --cov-report html
SPHINX_BUILD ?= $(PYTHON_DIRNAME)/sphinx-build SPHINX_BUILD ?= $(PYTHON_DIRNAME)/sphinx-build
@ -18,7 +19,6 @@ SPHINX_SOURCEDIR ?= docs
SPHINX_BUILDDIR ?= $(SPHINX_SOURCEDIR)/_build SPHINX_BUILDDIR ?= $(SPHINX_SOURCEDIR)/_build
YAPF ?= $(PYTHON) -m yapf YAPF ?= $(PYTHON) -m yapf
YAPF_OPTIONS ?= -rip YAPF_OPTIONS ?= -rip
VERSION ?= $(shell git describe 2>/dev/null || echo dev)
.PHONY: $(SPHINX_SOURCEDIR) clean format install install-dev test update update-requirements .PHONY: $(SPHINX_SOURCEDIR) clean format install install-dev test update update-requirements

View File

@ -7,6 +7,8 @@ python = require('python')
sphinx = require('sphinx') sphinx = require('sphinx')
yapf = require('yapf') yapf = require('yapf')
# python.set_versions('3.5', '3.6', '3.7') --> not yet implemented in medikit
python.setup( python.setup(
name='bonobo', name='bonobo',
description='Bonobo, a simple, modern and atomic extract-transform-load toolkit for python 3.5+.', description='Bonobo, a simple, modern and atomic extract-transform-load toolkit for python 3.5+.',
@ -18,9 +20,9 @@ python.setup(
data_files=[ data_files=[
( (
'share/jupyter/nbextensions/bonobo-jupyter', [ 'share/jupyter/nbextensions/bonobo-jupyter', [
'bonobo/ext/jupyter/static/extension.js', 'bonobo/contrib/jupyter/static/extension.js',
'bonobo/ext/jupyter/static/index.js', 'bonobo/contrib/jupyter/static/index.js',
'bonobo/ext/jupyter/static/index.js.map', 'bonobo/contrib/jupyter/static/index.js.map',
] ]
), ),
], ],
@ -29,34 +31,42 @@ python.setup(
'bonobo = bonobo.commands:entrypoint', 'bonobo = bonobo.commands:entrypoint',
], ],
'bonobo.commands': [ 'bonobo.commands': [
'convert = bonobo.commands.convert:register', 'convert = bonobo.commands.convert:ConvertCommand',
'init = bonobo.commands.init:register', 'download = bonobo.commands.download:DownloadCommand',
'inspect = bonobo.commands.inspect:register', 'examples = bonobo.commands.examples:ExamplesCommand',
'run = bonobo.commands.run:register', 'init = bonobo.commands.init:InitCommand',
'version = bonobo.commands.version:register', 'inspect = bonobo.commands.inspect:InspectCommand',
'run = bonobo.commands.run:RunCommand',
'version = bonobo.commands.version:VersionCommand',
], ],
} }
) )
python.add_requirements( python.add_requirements(
'colorama >=0.3,<1.0', 'fs ~=2.0',
'fs >=2.0,<3.0', 'graphviz >=0.8,<0.9',
'packaging >=16,<17', 'jinja2 ~=2.9',
'psutil >=5.2,<6.0', 'mondrian ~=0.6',
'requests >=2.0,<3.0', 'packaging ~=16.0',
'stevedore >=1.21,<2.0', 'psutil ~=5.4',
'python-slugify ~=1.2.0',
'requests ~=2.0',
'stevedore ~=1.27',
'whistle ~=1.0',
dev=[ dev=[
'cookiecutter >=1.5,<1.6', 'pytest-sugar >=0.9,<0.10',
'pytest-sugar >=0.8,<0.9', 'pytest-timeout ~=1.0',
'pytest-timeout >=1,<2',
], ],
docker=[ docker=[
'bonobo-docker', 'bonobo-docker ~=0.6.0a1',
], ],
jupyter=[ jupyter=[
'jupyter >=1.0,<1.1', 'ipywidgets ~=6.0',
'ipywidgets >=6.0.0,<7', 'jupyter ~=1.0',
] ],
sqlalchemy=[
'bonobo-sqlalchemy ~=0.6.0a1',
],
) )
# vim: ft=python: # vim: ft=python:

66
RELEASE-0.6.rst Normal file
View File

@ -0,0 +1,66 @@
Problems
========
Failed to display Jupyter Widget of type BonoboWidget.
If you're reading this message in Jupyter Notebook or JupyterLab, it may mean that the widgets JavaScript is still loading. If this message persists, it likely means that the widgets JavaScript library is either not installed or not enabled. See the Jupyter Widgets Documentation for setup instructions.
If you're reading this message in another notebook frontend (for example, a static rendering on GitHub or NBViewer), it may mean that your frontend doesn't currently support widgets.
.. code-block:: shell-session
$ jupyter nbextension enable --py widgetsnbextension
$ jupyter nbextension install --py --symlink bonobo.contrib.jupyter
$ jupyter nbextension enable --py bonobo.contrib.jupyter
Todo
====
* Pretty printer
Options for Bags
================
tuple only
pros : simple
cons :
- how to name columns / store headers ?
- how to return a dictionary
yield keys('foo', 'bar', 'baz')
yield 'a', 'b', 'c'
CHANGELOG
=========
* Bags changed to something way closer to namedtuples.
* Better at managing memory
* Less flexible for kwargs usage, but much more standard and portable from one to another version of python
* More future proof for different execution strategies
* May lead to changes in your current transformation
* A given transformation now have an input and a output "type" which is either manually set by the user or
detected from the first item sent through a queue. It is a restiction on how bonobo can be used, but
will help having better predicatability.
* No more "graph" instance detection. This was misleading for new users, and not really pythonic. The
recommended way to start with bonobo is just to use one python file with a __main__ block, and if the
project grows, include this file in a package, either new or existing one. The init cli changed to
help you generate files or packages. That also means that we do not generate things with cookiecutter
anymore.
* Jupyter enhancements
* Graphviz support
* New nodes in stdlib
* Registry, used for conversions but also for your own integrations.

56
benchmarks/parameters.py Normal file
View File

@ -0,0 +1,56 @@
"""
Compare passing a dict to passing a dict as kwargs to a stupid transformation
Last results (1 mill calls):
j1 1.5026444319955772
k1 1.8377482700016117
j2 1.1962292949901894
k2 1.5545833489886718
j3 1.0014333260041894
k3 1.353256585993222
"""
import json
import timeit
def j1(d):
return {'prepend': 'foo', **d, 'append': 'bar'}
def k1(**d):
return {'prepend': 'foo', **d, 'append': 'bar'}
def j2(d):
return {**d}
def k2(**d):
return {**d}
def j3(d):
return None
def k3(**d):
return None
if __name__ == '__main__':
import timeit
with open('person.json') as f:
json_data = json.load(f)
for i in 1, 2, 3:
print(
'j{}'.format(i),
timeit.timeit("j{}({!r})".format(i, json_data), setup="from __main__ import j{}".format(i))
)
print(
'k{}'.format(i),
timeit.timeit("k{}(**{!r})".format(i, json_data), setup="from __main__ import k{}".format(i))
)

46
benchmarks/person.json Normal file
View File

@ -0,0 +1,46 @@
{
"@context": "http://schema.org",
"@type": "MusicEvent",
"location": {
"@type": "MusicVenue",
"name": "Chicago Symphony Center",
"address": "220 S. Michigan Ave, Chicago, Illinois, USA"
},
"name": "Shostakovich Leningrad",
"offers": {
"@type": "Offer",
"url": "/examples/ticket/12341234",
"price": "40",
"priceCurrency": "USD",
"availability": "http://schema.org/InStock"
},
"performer": [
{
"@type": "MusicGroup",
"name": "Chicago Symphony Orchestra",
"sameAs": [
"http://cso.org/",
"http://en.wikipedia.org/wiki/Chicago_Symphony_Orchestra"
]
},
{
"@type": "Person",
"image": "/examples/jvanzweden_s.jpg",
"name": "Jaap van Zweden",
"sameAs": "http://www.jaapvanzweden.com/"
}
],
"startDate": "2014-05-23T20:00",
"workPerformed": [
{
"@type": "CreativeWork",
"name": "Britten Four Sea Interludes and Passacaglia from Peter Grimes",
"sameAs": "http://en.wikipedia.org/wiki/Peter_Grimes"
},
{
"@type": "CreativeWork",
"name": "Shostakovich Symphony No. 7 (Leningrad)",
"sameAs": "http://en.wikipedia.org/wiki/Symphony_No._7_(Shostakovich)"
}
]
}

File diff suppressed because one or more lines are too long

View File

@ -1,28 +1,20 @@
import logging from bonobo.execution.strategies import create_strategy
from bonobo.nodes import __all__ as _all_nodes
from bonobo.nodes import CsvReader, CsvWriter, FileReader, FileWriter, Filter, JsonReader, JsonWriter, Limit, \ from bonobo.nodes import *
PickleReader, PickleWriter, PrettyPrinter, RateLimited, Tee, arg0_to_kwargs, count, identity, kwargs_to_arg0, noop from bonobo.structs import Graph
from bonobo.strategies import create_strategy
from bonobo.structs import Bag, ErrorBag, Graph, Token
from bonobo.util import get_name from bonobo.util import get_name
from bonobo.util.api import ApiHelper
from bonobo.util.environ import parse_args, get_argument_parser
__all__ = [] __all__ = []
api = ApiHelper(__all__)
def register_api(x, __all__=__all__):
__all__.append(get_name(x))
return x
def register_api_group(*args): @api.register_graph
for attr in args: def run(graph, *, plugins=None, services=None, strategy=None):
register_api(attr)
@register_api
def run(graph, strategy=None, plugins=None, services=None):
""" """
Main entry point of bonobo. It takes a graph and creates all the necessary plumbery around to execute it. Main entry point of bonobo. It takes a graph and creates all the necessary plumbing around to execute it.
The only necessary argument is a :class:`Graph` instance, containing the logic you actually want to execute. The only necessary argument is a :class:`Graph` instance, containing the logic you actually want to execute.
@ -35,12 +27,11 @@ def run(graph, strategy=None, plugins=None, services=None):
You'll probably want to provide a services dictionary mapping service names to service instances. You'll probably want to provide a services dictionary mapping service names to service instances.
:param Graph graph: The :class:`Graph` to execute. :param Graph graph: The :class:`Graph` to execute.
:param str strategy: The :class:`bonobo.strategies.base.Strategy` to use. :param str strategy: The :class:`bonobo.execution.strategies.base.Strategy` to use.
:param list plugins: The list of plugins to enhance execution. :param list plugins: The list of plugins to enhance execution.
:param dict services: The implementations of services this graph will use. :param dict services: The implementations of services this graph will use.
:return bonobo.execution.graph.GraphExecutionContext: :return bonobo.execution.graph.GraphExecutionContext:
""" """
strategy = create_strategy(strategy)
plugins = plugins or [] plugins = plugins or []
@ -49,14 +40,18 @@ def run(graph, strategy=None, plugins=None, services=None):
if not settings.QUIET.get(): # pragma: no cover if not settings.QUIET.get(): # pragma: no cover
if _is_interactive_console(): if _is_interactive_console():
from bonobo.ext.console import ConsoleOutputPlugin import mondrian
mondrian.setup(excepthook=True)
from bonobo.plugins.console import ConsoleOutputPlugin
if ConsoleOutputPlugin not in plugins: if ConsoleOutputPlugin not in plugins:
plugins.append(ConsoleOutputPlugin) plugins.append(ConsoleOutputPlugin)
if _is_jupyter_notebook(): if _is_jupyter_notebook():
try: try:
from bonobo.ext.jupyter import JupyterOutputPlugin from bonobo.contrib.jupyter import JupyterOutputPlugin
except ImportError: except ImportError:
import logging
logging.warning( logging.warning(
'Failed to load jupyter widget. Easiest way is to install the optional "jupyter" ' 'Failed to load jupyter widget. Easiest way is to install the optional "jupyter" '
'dependencies with «pip install bonobo[jupyter]», but you can also install a specific ' 'dependencies with «pip install bonobo[jupyter]», but you can also install a specific '
@ -66,18 +61,39 @@ def run(graph, strategy=None, plugins=None, services=None):
if JupyterOutputPlugin not in plugins: if JupyterOutputPlugin not in plugins:
plugins.append(JupyterOutputPlugin) plugins.append(JupyterOutputPlugin)
import logging
logging.getLogger().setLevel(settings.LOGGING_LEVEL.get())
strategy = create_strategy(strategy)
return strategy.execute(graph, plugins=plugins, services=services) return strategy.execute(graph, plugins=plugins, services=services)
# bonobo.structs def _inspect_as_graph(graph):
register_api_group(Bag, ErrorBag, Graph, Token) return graph._repr_dot_()
# bonobo.strategies
register_api(create_strategy) _inspect_formats = {'graph': _inspect_as_graph}
@api.register_graph
def inspect(graph, *, plugins=None, services=None, strategy=None, format):
if not format in _inspect_formats:
raise NotImplementedError(
'Output format {} not implemented. Choices are: {}.'.format(
format, ', '.join(sorted(_inspect_formats.keys()))
)
)
print(_inspect_formats[format](graph))
# data structures
api.register_group(Graph)
# execution strategies
api.register_group(create_strategy)
# Shortcut to filesystem2's open_fs, that we make available there for convenience. # Shortcut to filesystem2's open_fs, that we make available there for convenience.
@register_api @api.register
def open_fs(fs_url=None, *args, **kwargs): def open_fs(fs_url=None, *args, **kwargs):
""" """
Wraps :func:`fs.open_fs` function with a few candies. Wraps :func:`fs.open_fs` function with a few candies.
@ -101,26 +117,33 @@ def open_fs(fs_url=None, *args, **kwargs):
return _open_fs(expanduser(str(fs_url)), *args, **kwargs) return _open_fs(expanduser(str(fs_url)), *args, **kwargs)
# bonobo.nodes # standard transformations
register_api_group( api.register_group(
CsvReader, CsvReader,
CsvWriter, CsvWriter,
FileReader, FileReader,
FileWriter, FileWriter,
Filter, Filter,
FixedWindow,
Format,
JsonReader, JsonReader,
JsonWriter, JsonWriter,
LdjsonReader,
LdjsonWriter,
Limit, Limit,
OrderFields,
PickleReader, PickleReader,
PickleWriter, PickleWriter,
PrettyPrinter, PrettyPrinter,
RateLimited, RateLimited,
Rename,
SetFields,
Tee, Tee,
arg0_to_kwargs, UnpackItems,
count, count,
identity, identity,
kwargs_to_arg0,
noop, noop,
check=_all_nodes,
) )
@ -136,13 +159,16 @@ def _is_jupyter_notebook():
return False return False
@register_api @api.register
def get_examples_path(*pathsegments): def get_examples_path(*pathsegments):
import os import os
import pathlib import pathlib
return str(pathlib.Path(os.path.dirname(__file__), 'examples', *pathsegments)) return str(pathlib.Path(os.path.dirname(__file__), 'examples', *pathsegments))
@register_api @api.register
def open_examples_fs(*pathsegments): def open_examples_fs(*pathsegments):
return open_fs(get_examples_path(*pathsegments)) return open_fs(get_examples_path(*pathsegments))
api.register_group(get_argument_parser, parse_args)

View File

@ -1 +1 @@
__version__ = '0.5.2' __version__ = '0.6.0'

View File

@ -1,11 +1,23 @@
import argparse import argparse
import logging
from bonobo import logging, settings import mondrian
from bonobo import settings
logger = logging.get_logger() from bonobo.commands.base import BaseCommand, BaseGraphCommand
def entrypoint(args=None): def entrypoint(args=None):
"""
Main callable for "bonobo" entrypoint.
Will load commands from "bonobo.commands" entrypoints, using stevedore.
"""
mondrian.setup(excepthook=True)
logger = logging.getLogger()
logger.setLevel(settings.LOGGING_LEVEL.get())
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--debug', '-D', action='store_true') parser.add_argument('--debug', '-D', action='store_true')
@ -17,7 +29,15 @@ def entrypoint(args=None):
def register_extension(ext, commands=commands): def register_extension(ext, commands=commands):
try: try:
parser = subparsers.add_parser(ext.name) parser = subparsers.add_parser(ext.name)
commands[ext.name] = ext.plugin(parser) if isinstance(ext.plugin, type) and issubclass(ext.plugin, BaseCommand):
# current way, class based.
cmd = ext.plugin()
cmd.add_arguments(parser)
cmd.__name__ = ext.name
commands[ext.name] = cmd.handle
else:
# old school, function based.
commands[ext.name] = ext.plugin(parser)
except Exception: except Exception:
logger.exception('Error while loading command {}.'.format(ext.name)) logger.exception('Error while loading command {}.'.format(ext.name))
@ -25,11 +45,17 @@ def entrypoint(args=None):
mgr = ExtensionManager(namespace='bonobo.commands') mgr = ExtensionManager(namespace='bonobo.commands')
mgr.map(register_extension) mgr.map(register_extension)
args = parser.parse_args(args).__dict__ parsed_args = parser.parse_args(args).__dict__
if args.pop('debug', False):
if parsed_args.pop('debug', False):
settings.DEBUG.set(True) settings.DEBUG.set(True)
settings.LOGGING_LEVEL.set(logging.DEBUG) settings.LOGGING_LEVEL.set(logging.DEBUG)
logging.set_level(settings.LOGGING_LEVEL.get()) logger.setLevel(settings.LOGGING_LEVEL.get())
logger.debug('Command: ' + args['command'] + ' Arguments: ' + repr(args)) logger.debug('Command: ' + parsed_args['command'] + ' Arguments: ' + repr(parsed_args))
commands[args.pop('command')](**args)
# Get command handler, execute, rince.
command = commands[parsed_args.pop('command')]
command(**parsed_args)
return 0

129
bonobo/commands/base.py Normal file
View File

@ -0,0 +1,129 @@
import argparse
import logging
import runpy
import sys
from contextlib import contextmanager
import bonobo.util.environ
from bonobo.util import get_name
from bonobo.util.environ import get_argument_parser, parse_args
class BaseCommand:
"""
Base class for CLI commands.
"""
@property
def logger(self):
try:
return self._logger
except AttributeError:
self._logger = logging.getLogger(get_name(self))
return self._logger
def add_arguments(self, parser):
"""
Entry point for subclassed commands to add custom arguments.
"""
pass
def handle(self, *args, **options):
"""
The actual logic of the command. Subclasses must implement this method.
"""
raise NotImplementedError('Subclasses of BaseCommand must provide a handle() method')
class BaseGraphCommand(BaseCommand):
"""
Base class for CLI commands that depends on a graph definition, either from a file or from a module.
"""
required = True
handler = None
def add_arguments(self, parser):
# target arguments (cannot provide both).
source_group = parser.add_mutually_exclusive_group(required=self.required)
source_group.add_argument('file', nargs='?', type=str)
source_group.add_argument('-m', dest='mod', type=str)
# add arguments to enforce system environment.
parser = get_argument_parser(parser)
return parser
def parse_options(self, **options):
return options
def handle(self, file, mod, **options):
options = self.parse_options(**options)
with self.read(file, mod, **options) as (graph, graph_execution_options, options):
return self.do_handle(graph, **graph_execution_options, **options)
def do_handle(self, graph, **options):
if not self.handler:
raise RuntimeError('{} has no handler defined.'.format(get_name(self)))
return self.handler(graph, **options)
@contextmanager
def read(self, file, mod, **options):
_graph, _graph_execution_options = None, None
def _record(graph, **graph_execution_options):
nonlocal _graph, _graph_execution_options
_graph, _graph_execution_options = graph, graph_execution_options
with _override_runner(_record), parse_args(options) as options:
_argv = sys.argv
try:
if file:
sys.argv = [file]
self._run_path(file)
elif mod:
sys.argv = [mod]
self._run_module(mod)
else:
raise RuntimeError('No target provided.')
finally:
sys.argv = _argv
if _graph is None:
raise RuntimeError('Could not find graph.')
yield _graph, _graph_execution_options, options
def _run_path(self, file):
return runpy.run_path(file, run_name='__main__')
def _run_module(self, mod):
return runpy.run_module(mod, run_name='__main__')
@contextmanager
def _override_runner(runner):
"""
Context manager that monkey patches `bonobo.run` function with our current command logic.
:param runner: the callable that will handle the `run()` logic.
"""
import bonobo
_get_argument_parser = bonobo.util.environ.get_argument_parser
_run = bonobo.run
try:
# Original get_argument_parser would create or update an argument parser with environment options, but here we
# already had them parsed so let's patch with something that creates an empty one instead.
def get_argument_parser(parser=None):
return parser or argparse.ArgumentParser()
bonobo.util.environ.get_argument_parser = get_argument_parser
bonobo.run = runner
yield runner
finally:
# Restore our saved values.
bonobo.util.environ.get_argument_parser = _get_argument_parser
bonobo.run = _run

View File

@ -1,81 +1,99 @@
import mimetypes
import os
import bonobo import bonobo
from bonobo.commands import BaseCommand
SHORTCUTS = { from bonobo.registry import READER, WRITER, default_registry
'csv': 'text/csv', from bonobo.util.resolvers import _resolve_transformations, _resolve_options
'json': 'application/json',
'pickle': 'pickle',
'plain': 'text/plain',
'text': 'text/plain',
'txt': 'text/plain',
}
REGISTRY = {
'application/json': (bonobo.JsonReader, bonobo.JsonWriter),
'pickle': (bonobo.PickleReader, bonobo.PickleWriter),
'text/csv': (bonobo.CsvReader, bonobo.CsvWriter),
'text/plain': (bonobo.FileReader, bonobo.FileWriter),
}
READER = 'reader'
WRITER = 'writer'
def resolve_factory(name, filename, factory_type): class ConvertCommand(BaseCommand):
""" def add_arguments(self, parser):
Try to resolve which transformation factory to use for this filename. User eventually provided a name, which has parser.add_argument('input_filename', help='Input filename.')
priority, otherwise we try to detect it using the mimetype detection on filename. parser.add_argument('output_filename', help='Output filename.')
parser.add_argument(
""" '--' + READER,
if name is None: '-r',
name = mimetypes.guess_type(filename)[0] help='Choose the reader factory if it cannot be detected from extension, or if detection is wrong.'
)
if name in SHORTCUTS: parser.add_argument(
name = SHORTCUTS[name] '--' + WRITER,
'-w',
if name is None: help=
_, _ext = os.path.splitext(filename) 'Choose the writer factory if it cannot be detected from extension, or if detection is wrong (use - for console pretty print).'
if _ext: )
_ext = _ext[1:] parser.add_argument(
if _ext in SHORTCUTS: '--limit',
name = SHORTCUTS[_ext] '-l',
type=int,
if not name in REGISTRY: help='Adds a Limit() after the reader instance.',
raise RuntimeError( default=None,
'Could not resolve {factory_type} factory for {filename} ({name}). Try providing it explicitely using -{opt} <format>.'. )
format(name=name, filename=filename, factory_type=factory_type, opt=factory_type[0]) parser.add_argument(
'--transformation',
'-t',
dest='transformation',
action='append',
help='Add a transformation between input and output (can be used multiple times, order is preserved).',
)
parser.add_argument(
'--option',
'-O',
dest='option',
action='append',
help='Add a named option to both reader and writer factories (i.e. foo="bar").',
)
parser.add_argument(
'--' + READER + '-option',
'-' + READER[0].upper(),
dest=READER + '_option',
action='append',
help='Add a named option to the reader factory.',
)
parser.add_argument(
'--' + WRITER + '-option',
'-' + WRITER[0].upper(),
dest=WRITER + '_option',
action='append',
help='Add a named option to the writer factory.',
) )
if factory_type == READER: def handle(
return REGISTRY[name][0] self,
elif factory_type == WRITER: input_filename,
return REGISTRY[name][1] output_filename,
else: reader=None,
raise ValueError('Invalid factory type.') reader_option=None,
writer=None,
writer_option=None,
option=None,
limit=None,
transformation=None,
):
reader_factory = default_registry.get_reader_factory_for(input_filename, format=reader)
reader_kwargs = _resolve_options((option or []) + (reader_option or []))
if output_filename == '-':
writer_factory = bonobo.PrettyPrinter
writer_args = ()
else:
writer_factory = default_registry.get_writer_factory_for(output_filename, format=writer)
writer_args = (output_filename, )
writer_kwargs = _resolve_options((option or []) + (writer_option or []))
def execute(input, output, reader=None, reader_options=None, writer=None, writer_options=None, options=None): transformations = ()
reader = resolve_factory(reader, input, READER)(input)
writer = resolve_factory(writer, output, WRITER)(output)
graph = bonobo.Graph() if limit:
graph.add_chain(reader, writer) transformations += (bonobo.Limit(limit), )
return bonobo.run( transformations += _resolve_transformations(transformation)
graph, services={
'fs': bonobo.open_fs(),
}
)
graph = bonobo.Graph()
graph.add_chain(
reader_factory(input_filename, **reader_kwargs),
*transformations,
writer_factory(*writer_args, **writer_kwargs),
)
def register(parser): return bonobo.run(
parser.add_argument('input') graph, services={
parser.add_argument('output') 'fs': bonobo.open_fs(),
parser.add_argument('--' + READER, '-r') }
parser.add_argument('--' + WRITER, '-w') )
# parser.add_argument('--reader-option', '-ro', dest='reader_options')
# parser.add_argument('--writer-option', '-wo', dest='writer_options')
# parser.add_argument('--option', '-o', dest='options')
return execute

View File

@ -0,0 +1,33 @@
import io
import re
import requests
import bonobo
from bonobo.commands import BaseCommand
EXAMPLES_BASE_URL = 'https://raw.githubusercontent.com/python-bonobo/bonobo/master/bonobo/examples/'
"""The URL to our git repository, in raw mode."""
class DownloadCommand(BaseCommand):
def handle(self, *, path, **options):
if not path.startswith('examples'):
raise ValueError('Download command currently supports examples only')
examples_path = re.sub('^examples/', '', path)
output_path = bonobo.get_examples_path(examples_path)
with _open_url(EXAMPLES_BASE_URL + examples_path) as response, open(output_path, 'wb') as fout:
for chunk in response.iter_content(io.DEFAULT_BUFFER_SIZE):
fout.write(chunk)
self.logger.info('Download saved to {}'.format(output_path))
def add_arguments(self, parser):
parser.add_argument('path', help='The relative path of the thing to download.')
def _open_url(url):
"""Open a HTTP connection to the URL and return a file-like object."""
response = requests.get(url, stream=True)
if response.status_code != 200:
raise IOError('Unable to download {}, HTTP {}'.format(url, response.status_code))
return response

View File

@ -0,0 +1,24 @@
from bonobo.commands import BaseCommand
all_examples = (
'clock',
'datasets',
'environ',
'files.csv_handlers',
'files.json_handlers',
'files.pickle_handlers',
'files.text_handlers',
'types',
)
class ExamplesCommand(BaseCommand):
def handle(self):
print('You can run the following examples:')
print()
for example in all_examples:
print(' $ python -m bonobo.examples.{}'.format(example))
print()
def add_arguments(self, parser):
pass

View File

@ -1,20 +1,74 @@
def execute(name, branch): import os
try:
from cookiecutter.main import cookiecutter
except ImportError as exc:
raise ImportError(
'You must install "cookiecutter" to use this command.\n\n $ pip install cookiecutter\n'
) from exc
return cookiecutter( from jinja2 import Environment, FileSystemLoader
'https://github.com/python-bonobo/cookiecutter-bonobo.git',
extra_context={'name': name}, from bonobo.commands import BaseCommand
no_input=True,
checkout=branch
)
def register(parser): class InitCommand(BaseCommand):
parser.add_argument('name') TEMPLATES = {'bare', 'default'}
parser.add_argument('--branch', '-b', default='master') TEMPLATES_PATH = os.path.join(os.path.dirname(__file__), 'templates')
return execute
def add_arguments(self, parser):
parser.add_argument('filename')
parser.add_argument('--force', '-f', default=False, action='store_true')
target_group = parser.add_mutually_exclusive_group(required=False)
target_group.add_argument('--template', '-t', choices=self.TEMPLATES, default='default')
target_group.add_argument('--package', '-p', action='store_true', default=False)
def create_file_from_template(self, *, template, filename):
template_name = template
name, ext = os.path.splitext(filename)
if ext != '.py':
raise ValueError('Filenames should end with ".py".')
loader = FileSystemLoader(self.TEMPLATES_PATH)
env = Environment(loader=loader)
template = env.get_template(template_name + '.py-tpl')
with open(filename, 'w+') as f:
f.write(template.render(name=name))
self.logger.info('Generated {} using template {!r}.'.format(filename, template_name))
def create_package(self, *, filename):
name, ext = os.path.splitext(filename)
if ext != '':
raise ValueError('Package names should not have an extension.')
try:
import medikit.commands
except ImportError as exc:
raise ImportError(
'To initialize a package, you need to install medikit (pip install --upgrade medikit).'
) from exc
package_name = os.path.basename(filename)
medikit.commands.handle_init(
os.path.join(os.getcwd(), filename, 'Projectfile'), name=package_name, requirements=['bonobo']
)
self.logger.info('Generated "{}" package with medikit.'.format(package_name))
self.create_file_from_template(template='default', filename=os.path.join(filename, package_name, '__main__.py'))
print('Your "{}" package has been created.'.format(package_name))
print()
print('Install it...')
print()
print(' pip install --editable {}'.format(filename))
print()
print('Then maybe run the example...')
print()
print(' python -m {}'.format(package_name))
print()
print('Enjoy!')
def handle(self, *, template, filename, package=False, force=False):
if os.path.exists(filename) and not force:
raise FileExistsError('Target filename already exists, use --force to override.')
if package:
self.create_package(filename=filename)
else:
self.create_file_from_template(template=template, filename=filename)

View File

@ -1,40 +1,15 @@
import json import bonobo
from bonobo.commands import BaseGraphCommand
from bonobo.commands.run import read, register_generic_run_arguments
from bonobo.constants import BEGIN
from bonobo.util.objects import get_name
OUTPUT_GRAPHVIZ = 'graphviz'
def _ident(graph, i): class InspectCommand(BaseGraphCommand):
escaped_index = str(i) handler = staticmethod(bonobo.inspect)
escaped_name = json.dumps(get_name(graph[i]))
return '{{{} [label={}]}}'.format(escaped_index, escaped_name)
def add_arguments(self, parser):
super(InspectCommand, self).add_arguments(parser)
parser.add_argument('--graph', '-g', dest='format', action='store_const', const='graph')
def execute(*, output, **kwargs): def parse_options(self, **options):
graph, plugins, services = read(**kwargs) if not options.get('format'):
raise RuntimeError('You must provide a format (try --graph).')
if output == OUTPUT_GRAPHVIZ: return options
print('digraph {')
print(' rankdir = LR;')
print(' "BEGIN" [shape="point"];')
for i in graph.outputs_of(BEGIN):
print(' "BEGIN" -> ' + _ident(graph, i) + ';')
for ix in graph.topologically_sorted_indexes:
for iy in graph.outputs_of(ix):
print(' {} -> {};'.format(_ident(graph, ix), _ident(graph, iy)))
print('}')
else:
raise NotImplementedError('Output type not implemented.')
def register(parser):
register_generic_run_arguments(parser)
parser.add_argument('--graph', '-g', dest='output', action='store_const', const=OUTPUT_GRAPHVIZ)
parser.set_defaults(output=OUTPUT_GRAPHVIZ)
return execute

View File

@ -1,29 +1,57 @@
import os import os
import bonobo import bonobo
from bonobo.constants import DEFAULT_SERVICES_ATTR, DEFAULT_SERVICES_FILENAME from bonobo.commands import BaseGraphCommand
DEFAULT_GRAPH_FILENAMES = ('__main__.py', 'main.py',)
DEFAULT_GRAPH_ATTR = 'get_graph'
def get_default_services(filename, services=None): class RunCommand(BaseGraphCommand):
dirname = os.path.dirname(filename) install = False
services_filename = os.path.join(dirname, DEFAULT_SERVICES_FILENAME) handler = staticmethod(bonobo.run)
if os.path.exists(services_filename):
with open(services_filename) as file:
code = compile(file.read(), services_filename, 'exec')
context = {
'__name__': '__bonobo__',
'__file__': services_filename,
}
exec(code, context)
return { def add_arguments(self, parser):
**context[DEFAULT_SERVICES_ATTR](), super(RunCommand, self).add_arguments(parser)
**(services or {}),
} verbosity_group = parser.add_mutually_exclusive_group()
return services or {} verbosity_group.add_argument('--quiet', '-q', action='store_true')
verbosity_group.add_argument('--verbose', '-v', action='store_true')
parser.add_argument('--install', '-I', action='store_true')
def parse_options(self, *, quiet=False, verbose=False, install=False, **options):
from bonobo import settings
settings.QUIET.set_if_true(quiet)
settings.DEBUG.set_if_true(verbose)
self.install = install
return options
def _run_path(self, file):
# add install logic
if self.install:
if os.path.isdir(file):
requirements = os.path.join(file, 'requirements.txt')
else:
requirements = os.path.join(os.path.dirname(file), 'requirements.txt')
_install_requirements(requirements)
return super()._run_path(file)
def _run_module(self, mod):
# install not implemented for a module, not sure it even make sense.
if self.install:
raise RuntimeError('--install behaviour when running a module is not defined.')
return super()._run_module(mod)
def register_generic_run_arguments(parser, required=True):
"""
Only there for backward compatibility with third party extensions.
TODO: This should be deprecated (using the @deprecated decorator) in 0.7, and removed in 0.8 or 0.9.
"""
dummy_command = BaseGraphCommand()
dummy_command.required = required
dummy_command.add_arguments(parser)
return parser
def _install_requirements(requirements): def _install_requirements(requirements):
@ -38,82 +66,3 @@ def _install_requirements(requirements):
pip.utils.pkg_resources = importlib.reload(pip.utils.pkg_resources) pip.utils.pkg_resources = importlib.reload(pip.utils.pkg_resources)
import site import site
importlib.reload(site) importlib.reload(site)
def read(filename, module, install=False, quiet=False, verbose=False, env=None):
import re
import runpy
from bonobo import Graph, settings
if quiet:
settings.QUIET.set(True)
if verbose:
settings.DEBUG.set(True)
if env:
quote_killer = re.compile('["\']')
for e in env:
var_name, var_value = e.split('=')
os.environ[var_name] = quote_killer.sub('', var_value)
if filename:
if os.path.isdir(filename):
if install:
requirements = os.path.join(filename, 'requirements.txt')
_install_requirements(requirements)
pathname = filename
for filename in DEFAULT_GRAPH_FILENAMES:
filename = os.path.join(pathname, filename)
if os.path.exists(filename):
break
if not os.path.exists(filename):
raise IOError('Could not find entrypoint (candidates: {}).'.format(', '.join(DEFAULT_GRAPH_FILENAMES)))
elif install:
requirements = os.path.join(os.path.dirname(filename), 'requirements.txt')
_install_requirements(requirements)
context = runpy.run_path(filename, run_name='__bonobo__')
elif module:
context = runpy.run_module(module, run_name='__bonobo__')
filename = context['__file__']
else:
raise RuntimeError('UNEXPECTED: argparse should not allow this.')
graphs = dict((k, v) for k, v in context.items() if isinstance(v, Graph))
assert len(graphs) == 1, (
'Having zero or more than one graph definition in one file is unsupported for now, '
'but it is something that will be implemented in the future.\n\nExpected: 1, got: {}.'
).format(len(graphs))
graph = list(graphs.values())[0]
plugins = []
services = get_default_services(
filename, context.get(DEFAULT_SERVICES_ATTR)() if DEFAULT_SERVICES_ATTR in context else None
)
return graph, plugins, services
def execute(filename, module, install=False, quiet=False, verbose=False, env=None):
graph, plugins, services = read(filename, module, install, quiet, verbose, env)
return bonobo.run(graph, plugins=plugins, services=services)
def register_generic_run_arguments(parser, required=True):
source_group = parser.add_mutually_exclusive_group(required=required)
source_group.add_argument('filename', nargs='?', type=str)
source_group.add_argument('--module', '-m', type=str)
parser.add_argument('--env', '-e', action='append')
return parser
def register(parser):
parser = register_generic_run_arguments(parser)
verbosity_group = parser.add_mutually_exclusive_group()
verbosity_group.add_argument('--quiet', '-q', action='store_true')
verbosity_group.add_argument('--verbose', '-v', action='store_true')
parser.add_argument('--install', '-I', action='store_true')
return execute

View File

@ -0,0 +1,15 @@
import bonobo
def get_graph(**options):
graph = bonobo.Graph()
return graph
def get_services(**options):
return {}
if __name__ == '__main__':
with bonobo.parse_args() as options:
bonobo.run(get_graph(**options), services=get_services(**options))

View File

@ -0,0 +1,55 @@
import bonobo
def extract():
"""Placeholder, change, rename, remove... """
yield 'hello'
yield 'world'
def transform(*args):
"""Placeholder, change, rename, remove... """
yield tuple(
map(str.title, args)
)
def load(*args):
"""Placeholder, change, rename, remove... """
print(*args)
def get_graph(**options):
"""
This function builds the graph that needs to be executed.
:return: bonobo.Graph
"""
graph = bonobo.Graph()
graph.add_chain(extract, transform, load)
return graph
def get_services(**options):
"""
This function builds the services dictionary, which is a simple dict of names-to-implementation used by bonobo
for runtime injection.
It will be used on top of the defaults provided by bonobo (fs, http, ...). You can override those defaults, or just
let the framework define them. You can also define your own services and naming is up to you.
:return: dict
"""
return {}
# The __main__ block actually execute the graph.
if __name__ == '__main__':
parser = bonobo.get_argument_parser()
with bonobo.parse_args(parser) as options:
bonobo.run(
get_graph(**options),
services=get_services(**options)
)

View File

@ -1,4 +1,36 @@
def format_version(mod, *, name=None, quiet=False): from bonobo.commands import BaseCommand
def get_versions(*, all=False, quiet=None):
import bonobo
from bonobo.util.pkgs import bonobo_packages
yield _format_version(bonobo, quiet=quiet)
if all:
for name in sorted(bonobo_packages):
if name != 'bonobo':
try:
mod = __import__(name.replace('-', '_'))
try:
yield _format_version(mod, name=name, quiet=quiet)
except Exception as exc:
yield '{} ({})'.format(name, exc)
except ImportError as exc:
yield '{} is not importable ({}).'.format(name, exc)
class VersionCommand(BaseCommand):
def handle(self, *, all=False, quiet=False):
for line in get_versions(all=all, quiet=quiet):
print(line)
def add_arguments(self, parser):
parser.add_argument('--all', '-a', action='store_true')
parser.add_argument('--quiet', '-q', action='count')
def _format_version(mod, *, name=None, quiet=False):
from bonobo.util.pkgs import bonobo_packages from bonobo.util.pkgs import bonobo_packages
args = { args = {
'name': name or mod.__name__, 'name': name or mod.__name__,
@ -14,27 +46,3 @@ def format_version(mod, *, name=None, quiet=False):
return '{version}'.format(**args) return '{version}'.format(**args)
raise RuntimeError('Hard to be so quiet...') raise RuntimeError('Hard to be so quiet...')
def execute(all=False, quiet=False):
import bonobo
from bonobo.util.pkgs import bonobo_packages
print(format_version(bonobo, quiet=quiet))
if all:
for name in sorted(bonobo_packages):
if name != 'bonobo':
try:
mod = __import__(name.replace('-', '_'))
try:
print(format_version(mod, name=name, quiet=quiet))
except Exception as exc:
print('{} ({})'.format(name, exc))
except ImportError as exc:
print('{} is not importable ({}).'.format(name, exc))
def register(parser):
parser.add_argument('--all', '-a', action='store_true')
parser.add_argument('--quiet', '-q', action='count')
return execute

View File

@ -1,9 +1,11 @@
from bonobo.config.configurables import Configurable from bonobo.config.configurables import Configurable
from bonobo.config.functools import transformation_factory
from bonobo.config.options import Method, Option from bonobo.config.options import Method, Option
from bonobo.config.processors import ContextProcessor from bonobo.config.processors import ContextProcessor, use_context, use_context_processor, use_raw_input, use_no_input
from bonobo.config.services import Container, Exclusive, Service, requires, create_container from bonobo.config.services import Container, Exclusive, Service, use, create_container
from bonobo.util import deprecated_alias
use = requires requires = deprecated_alias('requires', use)
# Bonobo's Config API # Bonobo's Config API
__all__ = [ __all__ = [
@ -16,5 +18,10 @@ __all__ = [
'Service', 'Service',
'create_container', 'create_container',
'requires', 'requires',
'transformation_factory',
'use', 'use',
'use_context',
'use_context_processor',
'use_no_input',
'use_raw_input',
] ]

View File

@ -3,7 +3,6 @@ from bonobo.util import isoption, iscontextprocessor, sortedlist, get_name
__all__ = [ __all__ = [
'Configurable', 'Configurable',
'Option',
] ]
get_creation_counter = lambda v: v._creation_counter get_creation_counter = lambda v: v._creation_counter
@ -18,6 +17,7 @@ class ConfigurableMeta(type):
super().__init__(what, bases, dict) super().__init__(what, bases, dict)
cls.__processors = sortedlist() cls.__processors = sortedlist()
cls.__processors_cache = None
cls.__methods = sortedlist() cls.__methods = sortedlist()
cls.__options = sortedlist() cls.__options = sortedlist()
cls.__names = set() cls.__names = set()
@ -67,7 +67,9 @@ class ConfigurableMeta(type):
@property @property
def __processors__(cls): def __processors__(cls):
return (processor for _, processor in cls.__processors) if cls.__processors_cache is None:
cls.__processors_cache = [processor for _, processor in cls.__processors]
return cls.__processors_cache
def __repr__(self): def __repr__(self):
return ' '.join(( return ' '.join((
@ -85,7 +87,7 @@ except:
else: else:
class PartiallyConfigured(_functools.partial): class PartiallyConfigured(_functools.partial):
@property # TODO XXX cache this shit @property # TODO XXX cache this
def _options_values(self): def _options_values(self):
""" Simulate option values for partially configured objects. """ """ Simulate option values for partially configured objects. """
try: try:
@ -162,8 +164,8 @@ class Configurable(metaclass=ConfigurableMeta):
if len(extraneous): if len(extraneous):
raise TypeError( raise TypeError(
'{}() got {} unexpected option{}: {}.'.format( '{}() got {} unexpected option{}: {}.'.format(
cls.__name__, cls.__name__, len(extraneous), 's'
len(extraneous), 's' if len(extraneous) > 1 else '', ', '.join(map(repr, sorted(extraneous))) if len(extraneous) > 1 else '', ', '.join(map(repr, sorted(extraneous)))
) )
) )
@ -173,8 +175,8 @@ class Configurable(metaclass=ConfigurableMeta):
if _final: if _final:
raise TypeError( raise TypeError(
'{}() missing {} required option{}: {}.'.format( '{}() missing {} required option{}: {}.'.format(
cls.__name__, cls.__name__, len(missing), 's'
len(missing), 's' if len(missing) > 1 else '', ', '.join(map(repr, sorted(missing))) if len(missing) > 1 else '', ', '.join(map(repr, sorted(missing)))
) )
) )
return PartiallyConfigured(cls, *args, **kwargs) return PartiallyConfigured(cls, *args, **kwargs)
@ -209,9 +211,7 @@ class Configurable(metaclass=ConfigurableMeta):
position += 1 position += 1
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
""" You can implement a configurable callable behaviour by implemenenting the call(...) method. Of course, it is also backward compatible with legacy __call__ override. raise AbstractError(self.__call__)
"""
return self.call(*args, **kwargs)
@property @property
def __options__(self): def __options__(self):
@ -220,6 +220,3 @@ class Configurable(metaclass=ConfigurableMeta):
@property @property
def __processors__(self): def __processors__(self):
return type(self).__processors__ return type(self).__processors__
def call(self, *args, **kwargs):
raise AbstractError('Not implemented.')

View File

@ -0,0 +1,17 @@
import functools
import itertools
def transformation_factory(f):
@functools.wraps(f)
def _transformation_factory(*args, **kwargs):
retval = f(*args, **kwargs)
retval.__name__ = f.__name__ + '({})'.format(
', '.join(itertools.chain(map(repr, args), ('{}={!r}'.format(k, v) for k, v in kwargs.items())))
)
return retval
_transformation_factory._partial = True
return _transformation_factory

View File

@ -1,4 +1,5 @@
from textwrap import dedent import textwrap
import types
from bonobo.util.inspect import istype from bonobo.util.inspect import istype
@ -46,7 +47,7 @@ class Option:
title = Option(str, required=True, positional=True) title = Option(str, required=True, positional=True)
keyword = Option(str, default='foo') keyword = Option(str, default='foo')
def call(self, s): def __call__(self, s):
return self.title + ': ' + s + ' (' + self.keyword + ')' return self.title + ': ' + s + ' (' + self.keyword + ')'
example = Example('hello', keyword='bar') example = Example('hello', keyword='bar')
@ -65,7 +66,7 @@ class Option:
# Docstring formating # Docstring formating
self.__doc__ = __doc__ or None self.__doc__ = __doc__ or None
if self.__doc__: if self.__doc__:
self.__doc__ = dedent(self.__doc__.strip('\n')).strip() self.__doc__ = textwrap.dedent(self.__doc__.strip('\n')).strip()
if default: if default:
self.__doc__ += '\nDefault: {!r}'.format(default) self.__doc__ += '\nDefault: {!r}'.format(default)
@ -103,6 +104,40 @@ class Option:
return self.default() if callable(self.default) else self.default return self.default() if callable(self.default) else self.default
class RemovedOption(Option):
def __init__(self, *args, value, **kwargs):
kwargs['required'] = False
super(RemovedOption, self).__init__(*args, **kwargs)
self.value = value
def clean(self, value):
if value != self.value:
raise ValueError(
'Removed options cannot change value, {!r} must now be {!r} (and you should remove setting the value explicitely, as it is deprecated and will be removed quite soon.'.
format(self.name, self.value)
)
return self.value
def get_default(self):
return self.value
class RenamedOption(Option):
def __init__(self, target, *, positional=False):
super(RenamedOption, self).__init__(required=False, positional=False)
self.target = target
def __get__(self, instance, owner):
raise ValueError(
'Trying to get value from renamed option {}, try getting {} instead.'.format(self.name, self.target)
)
def clean(self, value):
raise ValueError(
'Trying to set value of renamed option {}, try setting {} instead.'.format(self.name, self.target)
)
class Method(Option): class Method(Option):
""" """
A Method is a special callable-valued option, that can be used in three different ways (but for same purpose). A Method is a special callable-valued option, that can be used in three different ways (but for same purpose).
@ -132,20 +167,47 @@ class Method(Option):
>>> example3 = OtherChildMethodExample() >>> example3 = OtherChildMethodExample()
It's possible to pass a default implementation to a Method by calling it, making it suitable to use as a decorator.
>>> class MethodExampleWithDefault(Configurable):
... @Method()
... def handler(self):
... pass
""" """
def __init__(self, *, required=True, positional=True): def __init__(self, *, default=None, required=True, positional=True, __doc__=None):
super().__init__(None, required=required, positional=positional) super().__init__(None, required=required, positional=positional, __doc__=__doc__)
# If a callable is provided as default, then use self as if it was used as a decorator
if default is not None:
if not callable(default):
raise ValueError('Method defaults should be callable, if provided.')
self(default)
def __get__(self, inst, type_):
x = super(Method, self).__get__(inst, type_)
if inst:
x = types.MethodType(x, inst)
return x
def __set__(self, inst, value): def __set__(self, inst, value):
if not hasattr(value, '__call__'): if not callable(value):
raise TypeError( raise TypeError(
'Option of type {!r} is expecting a callable value, got {!r} object (which is not).'.format( 'Option {!r} ({}) is expecting a callable value, got {!r} object: {!r}.'.format(
type(self).__name__, type(value).__name__ self.name,
type(self).__name__,
type(value).__name__, value
) )
) )
inst._options_values[self.name] = self.type(value) if self.type else value inst._options_values[self.name] = self.type(value) if self.type else value
def __call__(self, *args, **kwargs): def __call__(self, impl):
# only here to trick IDEs into thinking this is callable. if self.default:
raise NotImplementedError('You cannot call the descriptor') raise RuntimeError('Can only be used once as a decorator.')
self.default = impl
self.required = False
return self
def get_default(self):
return self.default

View File

@ -1,11 +1,17 @@
from collections import Iterable from collections import Iterable
from contextlib import contextmanager from contextlib import contextmanager
from functools import partial
from inspect import signature
from bonobo.config.options import Option from bonobo.config import Option
from bonobo.util.compat import deprecated_alias from bonobo.errors import UnrecoverableTypeError
from bonobo.util.iterators import ensure_tuple from bonobo.util import deprecated_alias, ensure_tuple
_CONTEXT_PROCESSORS_ATTR = '__processors__' _raw = object()
_args = object()
_none = object()
INPUT_FORMATS = {_raw, _args, _none}
class ContextProcessor(Option): class ContextProcessor(Option):
@ -16,15 +22,15 @@ class ContextProcessor(Option):
It works like a yielding context manager, and is the recommended way to setup and teardown objects you'll need It works like a yielding context manager, and is the recommended way to setup and teardown objects you'll need
in the context of one execution. It's the way to overcome the stateless nature of transformations. in the context of one execution. It's the way to overcome the stateless nature of transformations.
The yielded values will be passed as positional arguments to the next context processors (order do matter), and The yielded values will be passed as positional arguments to the next context processors (order does matter), and
finally to the __call__ method of the transformation. finally to the __call__ method of the transformation.
Warning: this may change for a similar but simpler implementation, don't relly too much on it (yet). Warning: this may change for a similar but simpler implementation, don't rely too much on it (yet).
Example: Example:
>>> from bonobo.config import Configurable >>> from bonobo.config import Configurable
>>> from bonobo.util.objects import ValueHolder >>> from bonobo.util import ValueHolder
>>> class Counter(Configurable): >>> class Counter(Configurable):
... @ContextProcessor ... @ContextProcessor
@ -52,18 +58,11 @@ class ContextProcessor(Option):
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
return self.func(*args, **kwargs) return self.func(*args, **kwargs)
@classmethod
def decorate(cls, cls_or_func):
try:
cls_or_func.__processors__
except AttributeError:
cls_or_func.__processors__ = []
def decorator(processor, cls_or_func=cls_or_func): class bound(partial):
cls_or_func.__processors__.append(cls(processor)) @property
return cls_or_func def kwargs(self):
return self.keywords
return decorator
class ContextCurrifier: class ContextCurrifier:
@ -71,18 +70,47 @@ class ContextCurrifier:
This is a helper to resolve processors. This is a helper to resolve processors.
""" """
def __init__(self, wrapped, *initial_context): def __init__(self, wrapped, *args, **kwargs):
self.wrapped = wrapped self.wrapped = wrapped
self.context = tuple(initial_context) self.args = args
self.kwargs = kwargs
self.format = getattr(wrapped, '__input_format__', _args)
self._stack, self._stack_values = None, None self._stack, self._stack_values = None, None
def __iter__(self): def __iter__(self):
yield from self.wrapped yield from self.wrapped
def __call__(self, *args, **kwargs): def _bind(self, _input):
if not callable(self.wrapped) and isinstance(self.wrapped, Iterable): try:
return self.__iter__() bind = signature(self.wrapped).bind
return self.wrapped(*self.context, *args, **kwargs) except ValueError:
bind = partial(bound, self.wrapped)
if self.format is _args:
return bind(*self.args, *_input, **self.kwargs)
if self.format is _raw:
return bind(*self.args, _input, **self.kwargs)
if self.format is _none:
return bind(*self.args, **self.kwargs)
raise NotImplementedError('Invalid format {!r}.'.format(self.format))
def __call__(self, _input):
if not callable(self.wrapped):
if isinstance(self.wrapped, Iterable):
return self.__iter__()
raise UnrecoverableTypeError('Uncallable node {}'.format(self.wrapped))
try:
bound = self._bind(_input)
except TypeError as exc:
raise UnrecoverableTypeError((
'Input of {wrapped!r} does not bind to the node signature.\n'
'Args: {args}\n'
'Input: {input}\n'
'Kwargs: {kwargs}\n'
'Signature: {sig}'
).format(
wrapped=self.wrapped, args=self.args, input=_input, kwargs=self.kwargs, sig=signature(self.wrapped)
)) from exc
return self.wrapped(*bound.args, **bound.kwargs)
def setup(self, *context): def setup(self, *context):
if self._stack is not None: if self._stack is not None:
@ -90,11 +118,11 @@ class ContextCurrifier:
self._stack, self._stack_values = list(), list() self._stack, self._stack_values = list(), list()
for processor in resolve_processors(self.wrapped): for processor in resolve_processors(self.wrapped):
_processed = processor(self.wrapped, *context, *self.context) _processed = processor(self.wrapped, *context, *self.args, **self.kwargs)
_append_to_context = next(_processed) _append_to_context = next(_processed)
self._stack_values.append(_append_to_context) self._stack_values.append(_append_to_context)
if _append_to_context is not None: if _append_to_context is not None:
self.context += ensure_tuple(_append_to_context) self.args += ensure_tuple(_append_to_context)
self._stack.append(_processed) self._stack.append(_processed)
def teardown(self): def teardown(self):
@ -137,3 +165,42 @@ def resolve_processors(mixed):
get_context_processors = deprecated_alias('get_context_processors', resolve_processors) get_context_processors = deprecated_alias('get_context_processors', resolve_processors)
def use_context(f):
def context(self, context, *args, **kwargs):
yield context
return use_context_processor(context)(f)
def use_context_processor(context_processor):
def using_context_processor(cls_or_func):
nonlocal context_processor
try:
cls_or_func.__processors__
except AttributeError:
cls_or_func.__processors__ = []
cls_or_func.__processors__.append(ContextProcessor(context_processor))
return cls_or_func
return using_context_processor
def _use_input_format(input_format):
if input_format not in INPUT_FORMATS:
raise ValueError(
'Invalid input format {!r}. Choices: {}'.format(input_format, ', '.join(sorted(INPUT_FORMATS)))
)
def _set_input_format(f):
setattr(f, '__input_format__', input_format)
return f
return _set_input_format
use_no_input = _use_input_format(_none)
use_raw_input = _use_input_format(_raw)

View File

@ -1,3 +1,5 @@
import inspect
import pprint
import re import re
import threading import threading
import types import types
@ -73,13 +75,13 @@ class Container(dict):
return cls return cls
return super().__new__(cls, *args, **kwargs) return super().__new__(cls, *args, **kwargs)
def args_for(self, mixed): def kwargs_for(self, mixed):
try: try:
options = dict(mixed.__options__) options = dict(mixed.__options__)
except AttributeError: except AttributeError:
options = {} options = {}
return tuple(option.resolve(mixed, self) for name, option in options.items() if isinstance(option, Service)) return {name: option.resolve(mixed, self) for name, option in options.items() if isinstance(option, Service)}
def get(self, name, default=None): def get(self, name, default=None):
if not name in self: if not name in self:
@ -156,7 +158,7 @@ class Exclusive(ContextDecorator):
self.get_lock().release() self.get_lock().release()
def requires(*service_names): def use(*service_names):
def decorate(mixed): def decorate(mixed):
try: try:
options = mixed.__options__ options = mixed.__options__

View File

@ -1,9 +1,29 @@
from bonobo.structs.tokens import Token class Token:
"""Factory for signal oriented queue messages or other token types."""
def __init__(self, name):
self.__name__ = name
def __repr__(self):
return '<{}>'.format(self.__name__)
BEGIN = Token('Begin') BEGIN = Token('Begin')
END = Token('End') END = Token('End')
INHERIT_INPUT = Token('InheritInput')
LOOPBACK = Token('Loopback')
NOT_MODIFIED = Token('NotModified') class Flag(Token):
DEFAULT_SERVICES_FILENAME = '_services.py' must_be_first = False
DEFAULT_SERVICES_ATTR = 'get_services' must_be_last = False
allows_data = True
INHERIT = Flag('Inherit')
NOT_MODIFIED = Flag('NotModified')
NOT_MODIFIED.must_be_first = True
NOT_MODIFIED.must_be_last = True
NOT_MODIFIED.allows_data = False
EMPTY = tuple()
TICK_PERIOD = 0.2

View File

@ -0,0 +1,7 @@
from .utils import create_or_update
from .commands import ETLCommand
__all__ = [
'ETLCommand',
'create_or_update',
]

View File

@ -0,0 +1,54 @@
from logging import getLogger
import bonobo
from bonobo.plugins.console import ConsoleOutputPlugin
from bonobo.util.term import CLEAR_EOL
from colorama import Fore, Back, Style
from django.core.management import BaseCommand
from django.core.management.base import OutputWrapper
from .utils import create_or_update
class ETLCommand(BaseCommand):
@property
def logger(self):
try:
return self._logger
except AttributeError:
self._logger = getLogger(type(self).__module__)
return self._logger
create_or_update = staticmethod(create_or_update)
def create_parser(self, prog_name, subcommand):
return bonobo.get_argument_parser(super().create_parser(prog_name, subcommand))
def get_graph(self, *args, **options):
def not_implemented():
raise NotImplementedError('You must implement {}.get_graph() method.'.format(self))
return bonobo.Graph(not_implemented)
def get_services(self):
return {}
def info(self, *args, **kwargs):
self.logger.info(*args, **kwargs)
def handle(self, *args, **options):
_stdout_backup, _stderr_backup = self.stdout, self.stderr
self.stdout = OutputWrapper(ConsoleOutputPlugin._stdout, ending=CLEAR_EOL + '\n')
self.stderr = OutputWrapper(ConsoleOutputPlugin._stderr, ending=CLEAR_EOL + '\n')
self.stderr.style_func = lambda x: Fore.LIGHTRED_EX + Back.RED + '!' + Style.RESET_ALL + ' ' + x
with bonobo.parse_args(options) as options:
result = bonobo.run(
self.get_graph(*args, **options),
services=self.get_services(),
)
self.stdout, self.stderr = _stdout_backup, _stderr_backup
return '\nReturn Value: ' + str(result)

View File

@ -0,0 +1,24 @@
def create_or_update(model, *, defaults=None, save=True, **kwargs):
"""
Create or update a django model instance.
:param model:
:param defaults:
:param kwargs:
:return: object, created, updated
"""
obj, created = model._default_manager.get_or_create(defaults=defaults, **kwargs)
updated = False
if not created:
if defaults:
for k, v in defaults.items():
if getattr(obj, k) != v:
setattr(obj, k, v)
updated = True
if updated and save:
obj.save()
return obj, created, updated

View File

@ -0,0 +1,55 @@
import os
# https://developers.google.com/api-client-library/python/guide/aaa_oauth
# pip install google-api-python-client (1.6.4)
import httplib2
from apiclient import discovery
from oauth2client import client, tools
from oauth2client.file import Storage
from oauth2client.tools import argparser
HOME_DIR = os.path.expanduser('~')
GOOGLE_SECRETS = os.path.join(HOME_DIR, '.cache/secrets/client_secrets.json')
def get_credentials(*, scopes):
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
credential_dir = os.path.join(HOME_DIR, '.cache', __package__, 'credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir, 'googleapis.json')
store = Storage(credential_path)
credentials = store.get()
# see https://developers.google.com/api-client-library/python/auth/web-app
# kw: "incremental scopes"
if not credentials or credentials.invalid or not credentials.has_scopes(scopes):
flow = client.flow_from_clientsecrets(GOOGLE_SECRETS, scopes)
flow.user_agent = 'Bonobo ETL (https://www.bonobo-project.org/)'
flags = argparser.parse_args(['--noauth_local_webserver'])
credentials = tools.run_flow(flow, store, flags)
print('Storing credentials to ' + credential_path)
return credentials
def get_google_spreadsheets_api_client(scopes=('https://www.googleapis.com/auth/spreadsheets', )):
credentials = get_credentials(scopes=scopes)
http = credentials.authorize(httplib2.Http())
discoveryUrl = 'https://sheets.googleapis.com/$discovery/rest?version=v4'
return discovery.build('sheets', 'v4', http=http, discoveryServiceUrl=discoveryUrl, cache_discovery=False)
def get_google_people_api_client(scopes=('https://www.googleapis.com/auth/contacts', )):
credentials = get_credentials(scopes=scopes)
http = credentials.authorize(httplib2.Http())
discoveryUrl = 'https://people.googleapis.com/$discovery/rest?version=v1'
return discovery.build('people', 'v1', http=http, discoveryServiceUrl=discoveryUrl, cache_discovery=False)

View File

@ -1,4 +1,4 @@
from .plugin import JupyterOutputPlugin from bonobo.plugins.jupyter import JupyterOutputPlugin
def _jupyter_nbextension_paths(): def _jupyter_nbextension_paths():

1
bonobo/contrib/jupyter/js/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/node_modules

View File

@ -0,0 +1,19 @@
Bonobo within Jupyter
=====================
Install
-------
.. code-block:: shell-session
yarn install
Watch mode (for development)
----------------------------
.. code-block:: shell-session
yarn run webpack --watch

View File

@ -69,7 +69,7 @@ define(["jupyter-js-widgets"], function(__WEBPACK_EXTERNAL_MODULE_2__) { return
// When serialiazing entire widget state for embedding, only values different from the // When serialiazing entire widget state for embedding, only values different from the
// defaults will be specified. // defaults will be specified.
var BonoboModel = widgets.DOMWidgetModel.extend({ const BonoboModel = widgets.DOMWidgetModel.extend({
defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, { defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, {
_model_name: 'BonoboModel', _model_name: 'BonoboModel',
_view_name: 'BonoboView', _view_name: 'BonoboView',
@ -81,7 +81,7 @@ define(["jupyter-js-widgets"], function(__WEBPACK_EXTERNAL_MODULE_2__) { return
// Custom View. Renders the widget model. // Custom View. Renders the widget model.
var BonoboView = widgets.DOMWidgetView.extend({ const BonoboView = widgets.DOMWidgetView.extend({
render: function () { render: function () {
this.value_changed(); this.value_changed();
this.model.on('change:value', this.value_changed, this); this.model.on('change:value', this.value_changed, this);
@ -89,7 +89,9 @@ define(["jupyter-js-widgets"], function(__WEBPACK_EXTERNAL_MODULE_2__) { return
value_changed: function () { value_changed: function () {
this.$el.html( this.$el.html(
this.model.get('value').join('<br>') '<div class="rendered_html"><table style="margin: 0; border: 1px solid black;">' + this.model.get('value').map((key, i) => {
return `<tr><td>${key.status}</td><td>${key.name}</td><td>${key.stats}</td><td>${key.flags}</td></tr>`
}).join('\n') + '</table></div>'
); );
}, },
}); });

File diff suppressed because one or more lines are too long

View File

@ -8,7 +8,7 @@ var _ = require('underscore');
// When serialiazing entire widget state for embedding, only values different from the // When serialiazing entire widget state for embedding, only values different from the
// defaults will be specified. // defaults will be specified.
var BonoboModel = widgets.DOMWidgetModel.extend({ const BonoboModel = widgets.DOMWidgetModel.extend({
defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, { defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, {
_model_name: 'BonoboModel', _model_name: 'BonoboModel',
_view_name: 'BonoboView', _view_name: 'BonoboView',
@ -20,7 +20,7 @@ var BonoboModel = widgets.DOMWidgetModel.extend({
// Custom View. Renders the widget model. // Custom View. Renders the widget model.
var BonoboView = widgets.DOMWidgetView.extend({ const BonoboView = widgets.DOMWidgetView.extend({
render: function () { render: function () {
this.value_changed(); this.value_changed();
this.model.on('change:value', this.value_changed, this); this.model.on('change:value', this.value_changed, this);
@ -28,7 +28,9 @@ var BonoboView = widgets.DOMWidgetView.extend({
value_changed: function () { value_changed: function () {
this.$el.html( this.$el.html(
this.model.get('value').join('<br>') '<div class="rendered_html"><table style="margin: 0; border: 1px solid black;">' + this.model.get('value').map((key, i) => {
return `<tr><td>${key.status}</td><td>${key.name}</td><td>${key.stats}</td><td>${key.flags}</td></tr>`
}).join('\n') + '</table></div>'
); );
}, },
}); });

View File

@ -72,7 +72,7 @@ define(["jupyter-js-widgets"], function(__WEBPACK_EXTERNAL_MODULE_2__) { return
// When serialiazing entire widget state for embedding, only values different from the // When serialiazing entire widget state for embedding, only values different from the
// defaults will be specified. // defaults will be specified.
var BonoboModel = widgets.DOMWidgetModel.extend({ const BonoboModel = widgets.DOMWidgetModel.extend({
defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, { defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, {
_model_name: 'BonoboModel', _model_name: 'BonoboModel',
_view_name: 'BonoboView', _view_name: 'BonoboView',
@ -84,7 +84,7 @@ define(["jupyter-js-widgets"], function(__WEBPACK_EXTERNAL_MODULE_2__) { return
// Custom View. Renders the widget model. // Custom View. Renders the widget model.
var BonoboView = widgets.DOMWidgetView.extend({ const BonoboView = widgets.DOMWidgetView.extend({
render: function () { render: function () {
this.value_changed(); this.value_changed();
this.model.on('change:value', this.value_changed, this); this.model.on('change:value', this.value_changed, this);
@ -92,7 +92,9 @@ define(["jupyter-js-widgets"], function(__WEBPACK_EXTERNAL_MODULE_2__) { return
value_changed: function () { value_changed: function () {
this.$el.html( this.$el.html(
this.model.get('value').join('<br>') '<div class="rendered_html"><table style="margin: 0; border: 1px solid black;">' + this.model.get('value').map((key, i) => {
return `<tr><td>${key.status}</td><td>${key.name}</td><td>${key.stats}</td><td>${key.flags}</td></tr>`
}).join('\n') + '</table></div>'
); );
}, },
}); });

File diff suppressed because one or more lines are too long

View File

@ -14,14 +14,14 @@ def path_str(path):
class OpenDataSoftAPI(Configurable): class OpenDataSoftAPI(Configurable):
dataset = Option(str, positional=True) dataset = Option(str, positional=True)
endpoint = Option(str, default='{scheme}://{netloc}{path}') endpoint = Option(str, required=False, default='{scheme}://{netloc}{path}')
scheme = Option(str, default='https') scheme = Option(str, required=False, default='https')
netloc = Option(str, default='data.opendatasoft.com') netloc = Option(str, required=False, default='data.opendatasoft.com')
path = Option(path_str, default='/api/records/1.0/search/') path = Option(path_str, required=False, default='/api/records/1.0/search/')
rows = Option(int, default=500) rows = Option(int, required=False, default=500)
limit = Option(int, required=False) limit = Option(int, required=False)
timezone = Option(str, default='Europe/Paris') timezone = Option(str, required=False, default='Europe/Paris')
kwargs = Option(dict, default=dict) kwargs = Option(dict, required=False, default=dict)
@ContextProcessor @ContextProcessor
def compute_path(self, context): def compute_path(self, context):
@ -44,7 +44,11 @@ class OpenDataSoftAPI(Configurable):
break break
for row in records: for row in records:
yield {**row.get('fields', {}), 'geometry': row.get('geometry', {})} yield {
**row.get('fields', {}),
'geometry': row.get('geometry', {}),
'recordid': row.get('recordid'),
}
start += self.rows start += self.rows

View File

@ -1,31 +1,4 @@
# -*- coding: utf-8 -*- from bonobo.util import get_name
#
# Copyright 2012-2014 Romain Dorgueil
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
class AbstractError(NotImplementedError):
"""Abstract error is a convenient error to declare a method as "being left as an exercise for the reader"."""
def __init__(self, method):
super().__init__(
'Call to abstract method {class_name}.{method_name}(...): missing implementation.'.format(
class_name=method.__self__.__name__,
method_name=method.__name__,
)
)
class InactiveIOError(IOError): class InactiveIOError(IOError):
@ -63,6 +36,22 @@ class UnrecoverableError(Exception):
because you know that your transformation has no point continuing runnning after a bad event.""" because you know that your transformation has no point continuing runnning after a bad event."""
class AbstractError(UnrecoverableError, NotImplementedError):
"""Abstract error is a convenient error to declare a method as "being left as an exercise for the reader"."""
def __init__(self, method):
super().__init__(
'Call to abstract method {class_name}.{method_name}(...): missing implementation.'.format(
class_name=get_name(method.__self__),
method_name=get_name(method),
)
)
class UnrecoverableTypeError(UnrecoverableError, TypeError):
pass
class UnrecoverableValueError(UnrecoverableError, ValueError): class UnrecoverableValueError(UnrecoverableError, ValueError):
pass pass

View File

@ -1,23 +1,32 @@
def require(package, requirement=None): import bonobo
requirement = requirement or package
try:
return __import__(package) def get_argument_parser(parser=None):
except ImportError: parser = bonobo.get_argument_parser(parser=parser)
from colorama import Fore, Style
print( parser.add_argument(
Fore.YELLOW, '--limit',
'This example requires the {!r} package. Install it using:'. '-l',
format(requirement), type=int,
Style.RESET_ALL, default=None,
sep='' help='If set, limits the number of processed lines.'
) )
print() parser.add_argument(
print( '--print',
Fore.YELLOW, '-p',
' $ pip install {!s}'.format(requirement), action='store_true',
Style.RESET_ALL, default=False,
sep='' help='If set, pretty prints before writing to output file.'
) )
print()
raise return parser
def get_graph_options(options):
_limit = options.pop('limit', None)
_print = options.pop('print', False)
return {
'_limit': (bonobo.Limit(_limit), ) if _limit else (),
'_print': (bonobo.PrettyPrinter(), ) if _print else (),
}

View File

@ -0,0 +1,5 @@
if __name__ == '__main__':
from bonobo.commands import entrypoint
import sys
entrypoint(['examples'] + sys.argv[1:])

27
bonobo/examples/clock.py Normal file
View File

@ -0,0 +1,27 @@
import bonobo
import datetime
import time
def extract():
"""Placeholder, change, rename, remove... """
for x in range(60):
if x:
time.sleep(1)
yield datetime.datetime.now()
def get_graph():
graph = bonobo.Graph()
graph.add_chain(
extract,
print,
)
return graph
if __name__ == '__main__':
parser = bonobo.get_argument_parser()
with bonobo.parse_args(parser):
bonobo.run(get_graph())

View File

@ -0,0 +1,62 @@
import os
import bonobo
from bonobo import examples
from bonobo.examples.datasets.coffeeshops import get_graph as get_coffeeshops_graph
from bonobo.examples.datasets.fablabs import get_graph as get_fablabs_graph
from bonobo.examples.datasets.services import get_services, get_datasets_dir, get_minor_version
graph_factories = {
'coffeeshops': get_coffeeshops_graph,
'fablabs': get_fablabs_graph,
}
if __name__ == '__main__':
parser = examples.get_argument_parser()
parser.add_argument(
'--target', '-t', choices=graph_factories.keys(), nargs='+'
)
parser.add_argument('--sync', action='store_true', default=False)
with bonobo.parse_args(parser) as options:
graph_options = examples.get_graph_options(options)
graph_names = list(
options['target']
if options['target'] else sorted(graph_factories.keys())
)
# Create a graph with all requested subgraphs
graph = bonobo.Graph()
for name in graph_names:
graph = graph_factories[name](graph, **graph_options)
bonobo.run(graph, services=get_services())
if options['sync']:
# TODO: when parallel option for node will be implemented, need to be rewriten to use a graph.
import boto3
s3 = boto3.client('s3')
local_dir = get_datasets_dir()
for root, dirs, files in os.walk(local_dir):
for filename in files:
local_path = os.path.join(root, filename)
relative_path = os.path.relpath(local_path, local_dir)
s3_path = os.path.join(
get_minor_version(), relative_path
)
try:
s3.head_object(
Bucket='bonobo-examples', Key=s3_path
)
except:
s3.upload_file(
local_path,
'bonobo-examples',
s3_path,
ExtraArgs={
'ACL': 'public-read'
}
)

View File

@ -1,7 +0,0 @@
from os.path import dirname
import bonobo
def get_services():
return {'fs': bonobo.open_fs(dirname(__file__))}

View File

@ -1,182 +0,0 @@
{"les montparnos": "65 boulevard Pasteur, 75015 Paris, France",
"Coffee Chope": "344Vrue Vaugirard, 75015 Paris, France",
"Caf\u00e9 Lea": "5 rue Claude Bernard, 75005 Paris, France",
"Le Bellerive": "71 quai de Seine, 75019 Paris, France",
"Le drapeau de la fidelit\u00e9": "21 rue Copreaux, 75015 Paris, France",
"O q de poule": "53 rue du ruisseau, 75018 Paris, France",
"Le caf\u00e9 des amis": "125 rue Blomet, 75015 Paris, France",
"Le chantereine": "51 Rue Victoire, 75009 Paris, France",
"Le M\u00fcller": "11 rue Feutrier, 75018 Paris, France",
"Ext\u00e9rieur Quai": "5, rue d'Alsace, 75010 Paris, France",
"La Bauloise": "36 rue du hameau, 75015 Paris, France",
"Le Dellac": "14 rue Rougemont, 75009 Paris, France",
"Le Bosquet": "46 avenue Bosquet, 75007 Paris, France",
"Le Sully": "6 Bd henri IV, 75004 Paris, France",
"Le Felteu": "1 rue Pecquay, 75004 Paris, France",
"Le bistrot de Ma\u00eblle et Augustin": "42 rue coquill\u00e8re, 75001 Paris, France",
"D\u00e9d\u00e9 la frite": "52 rue Notre-Dame des Victoires, 75002 Paris, France",
"Cardinal Saint-Germain": "11 boulevard Saint-Germain, 75005 Paris, France",
"Le Reynou": "2 bis quai de la m\u00e9gisserie, 75001 Paris, France",
"Aux cadrans": "21 ter boulevard Diderot, 75012 Paris, France",
"Le Saint Jean": "23 rue des abbesses, 75018 Paris, France",
"La Renaissance": "112 Rue Championnet, 75018 Paris, France",
"Le Square": "31 rue Saint-Dominique, 75007 Paris, France",
"Les Arcades": "61 rue de Ponthieu, 75008 Paris, France",
"Le Kleemend's": "34 avenue Pierre Mend\u00e8s-France, 75013 Paris, France",
"Assaporare Dix sur Dix": "75, avenue Ledru-Rollin, 75012 Paris, France",
"Caf\u00e9 Pierre": "202 rue du faubourg st antoine, 75012 Paris, France",
"Caf\u00e9 antoine": "17 rue Jean de la Fontaine, 75016 Paris, France",
"Au cerceau d'or": "129 boulevard sebastopol, 75002 Paris, France",
"La Caravane": "Rue de la Fontaine au Roi, 75011 Paris, France",
"Le Pas Sage": "1 Passage du Grand Cerf, 75002 Paris, France",
"Le Caf\u00e9 Livres": "10 rue Saint Martin, 75004 Paris, France",
"Le Chaumontois": "12 rue Armand Carrel, 75018 Paris, France",
"Drole d'endroit pour une rencontre": "58 rue de Montorgueil, 75002 Paris, France",
"Le pari's caf\u00e9": "104 rue caulaincourt, 75018 Paris, France",
"Le Poulailler": "60 rue saint-sabin, 75011 Paris, France",
"Chai 33": "33 Cour Saint Emilion, 75012 Paris, France",
"L'Assassin": "99 rue Jean-Pierre Timbaud, 75011 Paris, France",
"l'Usine": "1 rue d'Avron, 75020 Paris, France",
"La Bricole": "52 rue Liebniz, 75018 Paris, France",
"le ronsard": "place maubert, 75005 Paris, France",
"Face Bar": "82 rue des archives, 75003 Paris, France",
"American Kitchen": "49 rue bichat, 75010 Paris, France",
"La Marine": "55 bis quai de valmy, 75010 Paris, France",
"Le Bloc": "21 avenue Brochant, 75017 Paris, France",
"La Recoleta au Manoir": "229 avenue Gambetta, 75020 Paris, France",
"Le Pareloup": "80 Rue Saint-Charles, 75015 Paris, France",
"La Brasserie Gait\u00e9": "3 rue de la Gait\u00e9, 75014 Paris, France",
"Caf\u00e9 Zen": "46 rue Victoire, 75009 Paris, France",
"O'Breizh": "27 rue de Penthi\u00e8vre, 75008 Paris, France",
"Le Petit Choiseul": "23 rue saint augustin, 75002 Paris, France",
"Invitez vous chez nous": "7 rue Ep\u00e9e de Bois, 75005 Paris, France",
"La Cordonnerie": "142 Rue Saint-Denis 75002 Paris, 75002 Paris, France",
"Le Supercoin": "3, rue Baudelique, 75018 Paris, France",
"Populettes": "86 bis rue Riquet, 75018 Paris, France",
"Au bon coin": "49 rue des Cloys, 75018 Paris, France",
"Le Couvent": "69 rue Broca, 75013 Paris, France",
"La Br\u00fblerie des Ternes": "111 rue mouffetard, 75005 Paris, France",
"L'\u00c9cir": "59 Boulevard Saint-Jacques, 75014 Paris, France",
"Le Chat bossu": "126, rue du Faubourg Saint Antoine, 75012 Paris, France",
"Denfert caf\u00e9": "58 boulvevard Saint Jacques, 75014 Paris, France",
"Le Caf\u00e9 frapp\u00e9": "95 rue Montmartre, 75002 Paris, France",
"La Perle": "78 rue vieille du temple, 75003 Paris, France",
"Le Descartes": "1 rue Thouin, 75005 Paris, France",
"Bagels & Coffee Corner": "Place de Clichy, 75017 Paris, France",
"Le petit club": "55 rue de la tombe Issoire, 75014 Paris, France",
"Le Plein soleil": "90 avenue Parmentier, 75011 Paris, France",
"Le Relais Haussmann": "146, boulevard Haussmann, 75008 Paris, France",
"Le Malar": "88 rue Saint-Dominique, 75007 Paris, France",
"Au panini de la place": "47 rue Belgrand, 75020 Paris, France",
"Le Village": "182 rue de Courcelles, 75017 Paris, France",
"Pause Caf\u00e9": "41 rue de Charonne, 75011 Paris, France",
"Le Pure caf\u00e9": "14 rue Jean Mac\u00e9, 75011 Paris, France",
"Extra old caf\u00e9": "307 fg saint Antoine, 75011 Paris, France",
"Chez Fafa": "44 rue Vinaigriers, 75010 Paris, France",
"En attendant l'or": "3 rue Faidherbe, 75011 Paris, France",
"Br\u00fblerie San Jos\u00e9": "30 rue des Petits-Champs, 75002 Paris, France",
"Caf\u00e9 de la Mairie (du VIII)": "rue de Lisbonne, 75008 Paris, France",
"Caf\u00e9 Martin": "2 place Martin Nadaud, 75001 Paris, France",
"Etienne": "14 rue Turbigo, Paris, 75001 Paris, France",
"L'ing\u00e9nu": "184 bd Voltaire, 75011 Paris, France",
"L'Olive": "8 rue L'Olive, 75018 Paris, France",
"Le Biz": "18 rue Favart, 75002 Paris, France",
"Le Cap Bourbon": "1 rue Louis le Grand, 75002 Paris, France",
"Le General Beuret": "9 Place du General Beuret, 75015 Paris, France",
"Le Germinal": "95 avenue Emile Zola, 75015 Paris, France",
"Le Ragueneau": "202 rue Saint-Honor\u00e9, 75001 Paris, France",
"Le refuge": "72 rue lamarck, 75018 Paris, France",
"Le sully": "13 rue du Faubourg Saint Denis, 75010 Paris, France",
"Le Dunois": "77 rue Dunois, 75013 Paris, France",
"La Montagne Sans Genevi\u00e8ve": "13 Rue du Pot de Fer, 75005 Paris, France",
"Le Caminito": "48 rue du Dessous des Berges, 75013 Paris, France",
"Le petit Bretonneau": "Le petit Bretonneau - \u00e0 l'int\u00e9rieur de l'H\u00f4pital, 75018 Paris, France",
"La chaumi\u00e8re gourmande": "Route de la Muette \u00e0 Neuilly",
"Club hippique du Jardin d\u2019Acclimatation": "75016 Paris, France",
"Le bal du pirate": "60 rue des bergers, 75015 Paris, France",
"Le Zazabar": "116 Rue de M\u00e9nilmontant, 75020 Paris, France",
"L'antre d'eux": "16 rue DE MEZIERES, 75006 Paris, France",
"l'orillon bar": "35 rue de l'orillon, 75011 Paris, France",
"zic zinc": "95 rue claude decaen, 75012 Paris, France",
"Les P\u00e8res Populaires": "46 rue de Buzenval, 75020 Paris, France",
"Epicerie Musicale": "55bis quai de Valmy, 75010 Paris, France",
"Le relais de la victoire": "73 rue de la Victoire, 75009 Paris, France",
"Le Centenaire": "104 rue amelot, 75011 Paris, France",
"Cafe de grenelle": "188 rue de Grenelle, 75007 Paris, France",
"Ragueneau": "202 rue Saint Honor\u00e9, 75001 Paris, France",
"Caf\u00e9 Pistache": "9 rue des petits champs, 75001 Paris, France",
"La Cagnotte": "13 Rue Jean-Baptiste Dumay, 75020 Paris, France",
"Le Killy Jen": "28 bis boulevard Diderot, 75012 Paris, France",
"Caf\u00e9 beauveau": "9 rue de Miromesnil, 75008 Paris, France",
"le 1 cinq": "172 rue de vaugirard, 75015 Paris, France",
"Les Artisans": "106 rue Lecourbe, 75015 Paris, France",
"Peperoni": "83 avenue de Wagram, 75001 Paris, France",
"Le Brio": "216, rue Marcadet, 75018 Paris, France",
"Tamm Bara": "7 rue Clisson, 75013 Paris, France",
"Caf\u00e9 dans l'aerogare Air France Invalides": "2 rue Robert Esnault Pelterie, 75007 Paris, France",
"bistrot les timbr\u00e9s": "14 rue d'alleray, 75015 Paris, France",
"Caprice caf\u00e9": "12 avenue Jean Moulin, 75014 Paris, France",
"Caves populaires": "22 rue des Dames, 75017 Paris, France",
"Au Vin Des Rues": "21 rue Boulard, 75014 Paris, France",
"Chez Prune": "36 rue Beaurepaire, 75010 Paris, France",
"L'In\u00e9vitable": "22 rue Linn\u00e9, 75005 Paris, France",
"L'anjou": "1 rue de Montholon, 75009 Paris, France",
"Botak cafe": "1 rue Paul albert, 75018 Paris, France",
"Bistrot Saint-Antoine": "58 rue du Fbg Saint-Antoine, 75012 Paris, France",
"Chez Oscar": "11/13 boulevard Beaumarchais, 75004 Paris, France",
"Le Piquet": "48 avenue de la Motte Picquet, 75015 Paris, France",
"L'avant comptoir": "3 carrefour de l'Od\u00e9on, 75006 Paris, France",
"le chateau d'eau": "67 rue du Ch\u00e2teau d'eau, 75010 Paris, France",
"Les Vendangeurs": "6/8 rue Stanislas, 75006 Paris, France",
"maison du vin": "52 rue des plantes, 75014 Paris, France",
"Le Tournebride": "104 rue Mouffetard, 75005 Paris, France",
"Le Fronton": "63 rue de Ponthieu, 75008 Paris, France",
"Le BB (Bouchon des Batignolles)": "2 rue Lemercier, 75017 Paris, France",
"La cantine de Zo\u00e9": "136 rue du Faubourg poissonni\u00e8re, 75010 Paris, France",
"Chez Rutabaga": "16 rue des Petits Champs, 75002 Paris, France",
"Les caves populaires": "22 rue des Dames, 75017 Paris, France",
"Le Plomb du cantal": "3 rue Ga\u00eet\u00e9, 75014 Paris, France",
"Trois pi\u00e8ces cuisine": "101 rue des dames, 75017 Paris, France",
"La Brocante": "10 rue Rossini, 75009 Paris, France",
"Le Zinc": "61 avenue de la Motte Picquet, 75015 Paris, France",
"Chez Luna": "108 rue de M\u00e9nilmontant, 75020 Paris, France",
"Le bar Fleuri": "1 rue du Plateau, 75019 Paris, France",
"La Libert\u00e9": "196 rue du faubourg saint-antoine, 75012 Paris, France",
"La cantoche de Paname": "40 Boulevard Beaumarchais, 75011 Paris, France",
"Le Saint Ren\u00e9": "148 Boulevard de Charonne, 75020 Paris, France",
"Caf\u00e9 Clochette": "16 avenue Richerand, 75010 Paris, France",
"L'europ\u00e9en": "21 Bis Boulevard Diderot, 75012 Paris, France",
"NoMa": "39 rue Notre Dame de Nazareth, 75003 Paris, France",
"le lutece": "380 rue de vaugirard, 75015 Paris, France",
"O'Paris": "1 Rue des Envierges, 75020 Paris, France",
"Rivolux": "16 rue de Rivoli, 75004 Paris, France",
"Brasiloja": "16 rue Ganneron, 75018 Paris, France",
"Institut des Cultures d'Islam": "19-23 rue L\u00e9on, 75018 Paris, France",
"Canopy Caf\u00e9 associatif": "19 rue Pajol, 75018 Paris, France",
"Petits Freres des Pauvres": "47 rue de Batignolles, 75017 Paris, France",
"Le Lucernaire": "53 rue Notre-Dame des Champs, 75006 Paris, France",
"L'Angle": "28 rue de Ponthieu, 75008 Paris, France",
"Le Caf\u00e9 d'avant": "35 rue Claude Bernard, 75005 Paris, France",
"Caf\u00e9 Dupont": "198 rue de la Convention, 75015 Paris, France",
"Le S\u00e9vign\u00e9": "15 rue du Parc Royal, 75003 Paris, France",
"L'Entracte": "place de l'opera, 75002 Paris, France",
"Panem": "18 rue de Crussol, 75011 Paris, France",
"Au pays de Vannes": "34 bis rue de Wattignies, 75012 Paris, France",
"l'El\u00e9phant du nil": "125 Rue Saint-Antoine, 75004 Paris, France",
"L'\u00e2ge d'or": "26 rue du Docteur Magnan, 75013 Paris, France",
"Le Comptoir": "354 bis rue Vaugirard, 75015 Paris, France",
"L'horizon": "93, rue de la Roquette, 75011 Paris, France",
"L'empreinte": "54, avenue Daumesnil, 75012 Paris, France",
"Caf\u00e9 Victor": "10 boulevard Victor, 75015 Paris, France",
"Caf\u00e9 Varenne": "36 rue de Varenne, 75007 Paris, France",
"Le Brigadier": "12 rue Blanche, 75009 Paris, France",
"Waikiki": "10 rue d\"Ulm, 75005 Paris, France",
"Le Parc Vaugirard": "358 rue de Vaugirard, 75015 Paris, France",
"Pari's Caf\u00e9": "174 avenue de Clichy, 75017 Paris, France",
"Melting Pot": "3 rue de Lagny, 75020 Paris, France",
"le Zango": "58 rue Daguerre, 75014 Paris, France",
"Chez Miamophile": "6 rue M\u00e9lingue, 75019 Paris, France",
"Le caf\u00e9 Monde et M\u00e9dias": "Place de la R\u00e9publique, 75003 Paris, France",
"Caf\u00e9 rallye tournelles": "11 Quai de la Tournelle, 75005 Paris, France",
"Brasserie le Morvan": "61 rue du ch\u00e2teau d'eau, 75010 Paris, France",
"L'entrep\u00f4t": "157 rue Bercy 75012 Paris, 75012 Paris, France"}

View File

@ -1,29 +1,64 @@
"""
Extracts a list of parisian bars where you can buy a coffee for a reasonable price, and store them in a flat text file.
.. graphviz::
digraph {
rankdir = LR;
stylesheet = "../_static/graphs.css";
BEGIN [shape="point"];
BEGIN -> "ODS()" -> "transform" -> "FileWriter()";
}
"""
import bonobo import bonobo
from bonobo.commands.run import get_default_services from bonobo import examples
from bonobo.ext.opendatasoft import OpenDataSoftAPI from bonobo.contrib.opendatasoft import OpenDataSoftAPI as ODSReader
from bonobo.examples.datasets.services import get_services
filename = 'coffeeshops.txt'
graph = bonobo.Graph( def get_graph(graph=None, *, _limit=(), _print=()):
OpenDataSoftAPI(dataset='liste-des-cafes-a-un-euro', netloc='opendata.paris.fr'), graph = graph or bonobo.Graph()
lambda row: '{nom_du_cafe}, {adresse}, {arrondissement} Paris, France'.format(**row),
bonobo.FileWriter(path=filename), producer = graph.add_chain(
) ODSReader(
dataset='liste-des-cafes-a-un-euro',
netloc='opendata.paris.fr'
),
*_limit,
bonobo.UnpackItems(0),
bonobo.Rename(
name='nom_du_cafe',
address='adresse',
zipcode='arrondissement'
),
bonobo.Format(city='Paris', country='France'),
bonobo.OrderFields(
[
'name', 'address', 'zipcode', 'city', 'country',
'geometry', 'geoloc'
]
),
*_print,
)
# Comma separated values.
graph.add_chain(
bonobo.CsvWriter(
'coffeeshops.csv',
fields=['name', 'address', 'zipcode', 'city'],
delimiter=','
),
_input=producer.output,
)
# Standard JSON
graph.add_chain(
bonobo.JsonWriter(path='coffeeshops.json'),
_input=producer.output,
)
# Line-delimited JSON
graph.add_chain(
bonobo.LdjsonWriter(path='coffeeshops.ldjson'),
_input=producer.output,
)
return graph
if __name__ == '__main__': if __name__ == '__main__':
bonobo.run(graph, services=get_default_services(__file__)) parser = examples.get_argument_parser()
with bonobo.parse_args(parser) as options:
bonobo.run(
get_graph(**examples.get_graph_options(options)),
services=get_services()
)

View File

@ -1,182 +0,0 @@
Extérieur Quai, 5, rue d'Alsace, 75010 Paris, France
Le Sully, 6 Bd henri IV, 75004 Paris, France
O q de poule, 53 rue du ruisseau, 75018 Paris, France
Le Pas Sage, 1 Passage du Grand Cerf, 75002 Paris, France
La Renaissance, 112 Rue Championnet, 75018 Paris, France
La Caravane, Rue de la Fontaine au Roi, 75011 Paris, France
Le chantereine, 51 Rue Victoire, 75009 Paris, France
Le Müller, 11 rue Feutrier, 75018 Paris, France
Le drapeau de la fidelité, 21 rue Copreaux, 75015 Paris, France
Le café des amis, 125 rue Blomet, 75015 Paris, France
Le Café Livres, 10 rue Saint Martin, 75004 Paris, France
Le Bosquet, 46 avenue Bosquet, 75007 Paris, France
Le Chaumontois, 12 rue Armand Carrel, 75018 Paris, France
Le Kleemend's, 34 avenue Pierre Mendès-France, 75013 Paris, France
Café Pierre, 202 rue du faubourg st antoine, 75012 Paris, France
Les Arcades, 61 rue de Ponthieu, 75008 Paris, France
Le Square, 31 rue Saint-Dominique, 75007 Paris, France
Assaporare Dix sur Dix, 75, avenue Ledru-Rollin, 75012 Paris, France
Au cerceau d'or, 129 boulevard sebastopol, 75002 Paris, France
Aux cadrans, 21 ter boulevard Diderot, 75012 Paris, France
Café antoine, 17 rue Jean de la Fontaine, 75016 Paris, France
Café de la Mairie (du VIII), rue de Lisbonne, 75008 Paris, France
Café Lea, 5 rue Claude Bernard, 75005 Paris, France
Cardinal Saint-Germain, 11 boulevard Saint-Germain, 75005 Paris, France
Dédé la frite, 52 rue Notre-Dame des Victoires, 75002 Paris, France
La Bauloise, 36 rue du hameau, 75015 Paris, France
Le Bellerive, 71 quai de Seine, 75019 Paris, France
Le bistrot de Maëlle et Augustin, 42 rue coquillère, 75001 Paris, France
Le Dellac, 14 rue Rougemont, 75009 Paris, France
Le Felteu, 1 rue Pecquay, 75004 Paris, France
Le Reynou, 2 bis quai de la mégisserie, 75001 Paris, France
Le Saint Jean, 23 rue des abbesses, 75018 Paris, France
les montparnos, 65 boulevard Pasteur, 75015 Paris, France
L'antre d'eux, 16 rue DE MEZIERES, 75006 Paris, France
Drole d'endroit pour une rencontre, 58 rue de Montorgueil, 75002 Paris, France
Le pari's café, 104 rue caulaincourt, 75018 Paris, France
Le Poulailler, 60 rue saint-sabin, 75011 Paris, France
Chai 33, 33 Cour Saint Emilion, 75012 Paris, France
L'Assassin, 99 rue Jean-Pierre Timbaud, 75011 Paris, France
l'Usine, 1 rue d'Avron, 75020 Paris, France
La Bricole, 52 rue Liebniz, 75018 Paris, France
le ronsard, place maubert, 75005 Paris, France
Face Bar, 82 rue des archives, 75003 Paris, France
American Kitchen, 49 rue bichat, 75010 Paris, France
La Marine, 55 bis quai de valmy, 75010 Paris, France
Le Bloc, 21 avenue Brochant, 75017 Paris, France
La Recoleta au Manoir, 229 avenue Gambetta, 75020 Paris, France
Le Pareloup, 80 Rue Saint-Charles, 75015 Paris, France
La Brasserie Gaité, 3 rue de la Gaité, 75014 Paris, France
Café Zen, 46 rue Victoire, 75009 Paris, France
O'Breizh, 27 rue de Penthièvre, 75008 Paris, France
Le Petit Choiseul, 23 rue saint augustin, 75002 Paris, France
Invitez vous chez nous, 7 rue Epée de Bois, 75005 Paris, France
La Cordonnerie, 142 Rue Saint-Denis 75002 Paris, 75002 Paris, France
Le Supercoin, 3, rue Baudelique, 75018 Paris, France
Populettes, 86 bis rue Riquet, 75018 Paris, France
Au bon coin, 49 rue des Cloys, 75018 Paris, France
Le Couvent, 69 rue Broca, 75013 Paris, France
La Brûlerie des Ternes, 111 rue mouffetard, 75005 Paris, France
L'Écir, 59 Boulevard Saint-Jacques, 75014 Paris, France
Le Chat bossu, 126, rue du Faubourg Saint Antoine, 75012 Paris, France
Denfert café, 58 boulvevard Saint Jacques, 75014 Paris, France
Le Café frappé, 95 rue Montmartre, 75002 Paris, France
La Perle, 78 rue vieille du temple, 75003 Paris, France
Le Descartes, 1 rue Thouin, 75005 Paris, France
Le petit club, 55 rue de la tombe Issoire, 75014 Paris, France
Le Plein soleil, 90 avenue Parmentier, 75011 Paris, France
Le Relais Haussmann, 146, boulevard Haussmann, 75008 Paris, France
Le Malar, 88 rue Saint-Dominique, 75007 Paris, France
Au panini de la place, 47 rue Belgrand, 75020 Paris, France
Le Village, 182 rue de Courcelles, 75017 Paris, France
Pause Café, 41 rue de Charonne, 75011 Paris, France
Le Pure café, 14 rue Jean Macé, 75011 Paris, France
Extra old café, 307 fg saint Antoine, 75011 Paris, France
Chez Fafa, 44 rue Vinaigriers, 75010 Paris, France
En attendant l'or, 3 rue Faidherbe, 75011 Paris, France
Brûlerie San José, 30 rue des Petits-Champs, 75002 Paris, France
Café Martin, 2 place Martin Nadaud, 75001 Paris, France
Etienne, 14 rue Turbigo, Paris, 75001 Paris, France
L'ingénu, 184 bd Voltaire, 75011 Paris, France
L'Olive, 8 rue L'Olive, 75018 Paris, France
Le Biz, 18 rue Favart, 75002 Paris, France
Le Cap Bourbon, 1 rue Louis le Grand, 75002 Paris, France
Le General Beuret, 9 Place du General Beuret, 75015 Paris, France
Le Germinal, 95 avenue Emile Zola, 75015 Paris, France
Le Ragueneau, 202 rue Saint-Honoré, 75001 Paris, France
Le refuge, 72 rue lamarck, 75018 Paris, France
Le sully, 13 rue du Faubourg Saint Denis, 75010 Paris, France
Coffee Chope, 344Vrue Vaugirard, 75015 Paris, France
Le bal du pirate, 60 rue des bergers, 75015 Paris, France
zic zinc, 95 rue claude decaen, 75012 Paris, France
l'orillon bar, 35 rue de l'orillon, 75011 Paris, France
Le Zazabar, 116 Rue de Ménilmontant, 75020 Paris, France
L'Inévitable, 22 rue Linné, 75005 Paris, France
Le Dunois, 77 rue Dunois, 75013 Paris, France
Ragueneau, 202 rue Saint Honoré, 75001 Paris, France
Le Caminito, 48 rue du Dessous des Berges, 75013 Paris, France
Epicerie Musicale, 55bis quai de Valmy, 75010 Paris, France
Le petit Bretonneau, Le petit Bretonneau - à l'intérieur de l'Hôpital, 75018 Paris, France
Le Centenaire, 104 rue amelot, 75011 Paris, France
La Montagne Sans Geneviève, 13 Rue du Pot de Fer, 75005 Paris, France
Les Pères Populaires, 46 rue de Buzenval, 75020 Paris, France
Cafe de grenelle, 188 rue de Grenelle, 75007 Paris, France
Le relais de la victoire, 73 rue de la Victoire, 75009 Paris, France
La chaumière gourmande, Route de la Muette à Neuilly
Club hippique du Jardin dAcclimatation, 75016 Paris, France
Le Brio, 216, rue Marcadet, 75018 Paris, France
Caves populaires, 22 rue des Dames, 75017 Paris, France
Caprice café, 12 avenue Jean Moulin, 75014 Paris, France
Tamm Bara, 7 rue Clisson, 75013 Paris, France
L'anjou, 1 rue de Montholon, 75009 Paris, France
Café dans l'aerogare Air France Invalides, 2 rue Robert Esnault Pelterie, 75007 Paris, France
Chez Prune, 36 rue Beaurepaire, 75010 Paris, France
Au Vin Des Rues, 21 rue Boulard, 75014 Paris, France
bistrot les timbrés, 14 rue d'alleray, 75015 Paris, France
Café beauveau, 9 rue de Miromesnil, 75008 Paris, France
Café Pistache, 9 rue des petits champs, 75001 Paris, France
La Cagnotte, 13 Rue Jean-Baptiste Dumay, 75020 Paris, France
le 1 cinq, 172 rue de vaugirard, 75015 Paris, France
Le Killy Jen, 28 bis boulevard Diderot, 75012 Paris, France
Les Artisans, 106 rue Lecourbe, 75015 Paris, France
Peperoni, 83 avenue de Wagram, 75001 Paris, France
le lutece, 380 rue de vaugirard, 75015 Paris, France
Brasiloja, 16 rue Ganneron, 75018 Paris, France
Rivolux, 16 rue de Rivoli, 75004 Paris, France
L'européen, 21 Bis Boulevard Diderot, 75012 Paris, France
NoMa, 39 rue Notre Dame de Nazareth, 75003 Paris, France
O'Paris, 1 Rue des Envierges, 75020 Paris, France
Café Clochette, 16 avenue Richerand, 75010 Paris, France
La cantoche de Paname, 40 Boulevard Beaumarchais, 75011 Paris, France
Le Saint René, 148 Boulevard de Charonne, 75020 Paris, France
La Liberté, 196 rue du faubourg saint-antoine, 75012 Paris, France
Chez Rutabaga, 16 rue des Petits Champs, 75002 Paris, France
Le BB (Bouchon des Batignolles), 2 rue Lemercier, 75017 Paris, France
La Brocante, 10 rue Rossini, 75009 Paris, France
Le Plomb du cantal, 3 rue Gaîté, 75014 Paris, France
Les caves populaires, 22 rue des Dames, 75017 Paris, France
Chez Luna, 108 rue de Ménilmontant, 75020 Paris, France
Le bar Fleuri, 1 rue du Plateau, 75019 Paris, France
Trois pièces cuisine, 101 rue des dames, 75017 Paris, France
Le Zinc, 61 avenue de la Motte Picquet, 75015 Paris, France
La cantine de Zoé, 136 rue du Faubourg poissonnière, 75010 Paris, France
Les Vendangeurs, 6/8 rue Stanislas, 75006 Paris, France
L'avant comptoir, 3 carrefour de l'Odéon, 75006 Paris, France
Botak cafe, 1 rue Paul albert, 75018 Paris, France
le chateau d'eau, 67 rue du Château d'eau, 75010 Paris, France
Bistrot Saint-Antoine, 58 rue du Fbg Saint-Antoine, 75012 Paris, France
Chez Oscar, 11/13 boulevard Beaumarchais, 75004 Paris, France
Le Fronton, 63 rue de Ponthieu, 75008 Paris, France
Le Piquet, 48 avenue de la Motte Picquet, 75015 Paris, France
Le Tournebride, 104 rue Mouffetard, 75005 Paris, France
maison du vin, 52 rue des plantes, 75014 Paris, France
L'entrepôt, 157 rue Bercy 75012 Paris, 75012 Paris, France
Le café Monde et Médias, Place de la République, 75003 Paris, France
Café rallye tournelles, 11 Quai de la Tournelle, 75005 Paris, France
Brasserie le Morvan, 61 rue du château d'eau, 75010 Paris, France
Chez Miamophile, 6 rue Mélingue, 75019 Paris, France
Panem, 18 rue de Crussol, 75011 Paris, France
Petits Freres des Pauvres, 47 rue de Batignolles, 75017 Paris, France
Café Dupont, 198 rue de la Convention, 75015 Paris, France
L'Angle, 28 rue de Ponthieu, 75008 Paris, France
Institut des Cultures d'Islam, 19-23 rue Léon, 75018 Paris, France
Canopy Café associatif, 19 rue Pajol, 75018 Paris, France
L'Entracte, place de l'opera, 75002 Paris, France
Le Sévigné, 15 rue du Parc Royal, 75003 Paris, France
Le Café d'avant, 35 rue Claude Bernard, 75005 Paris, France
Le Lucernaire, 53 rue Notre-Dame des Champs, 75006 Paris, France
Le Brigadier, 12 rue Blanche, 75009 Paris, France
L'âge d'or, 26 rue du Docteur Magnan, 75013 Paris, France
Bagels & Coffee Corner, Place de Clichy, 75017 Paris, France
Café Victor, 10 boulevard Victor, 75015 Paris, France
L'empreinte, 54, avenue Daumesnil, 75012 Paris, France
L'horizon, 93, rue de la Roquette, 75011 Paris, France
Waikiki, 10 rue d"Ulm, 75005 Paris, France
Au pays de Vannes, 34 bis rue de Wattignies, 75012 Paris, France
Café Varenne, 36 rue de Varenne, 75007 Paris, France
l'Eléphant du nil, 125 Rue Saint-Antoine, 75004 Paris, France
Le Comptoir, 354 bis rue Vaugirard, 75015 Paris, France
Le Parc Vaugirard, 358 rue de Vaugirard, 75015 Paris, France
le Zango, 58 rue Daguerre, 75014 Paris, France
Melting Pot, 3 rue de Lagny, 75020 Paris, France
Pari's Café, 174 avenue de Clichy, 75017 Paris, France

View File

@ -16,11 +16,10 @@ and a flat txt file.
import json import json
from colorama import Fore, Style
import bonobo import bonobo
from bonobo.commands.run import get_default_services from bonobo import examples
from bonobo.ext.opendatasoft import OpenDataSoftAPI from bonobo.contrib.opendatasoft import OpenDataSoftAPI
from bonobo.examples.datasets.services import get_services
try: try:
import pycountry import pycountry
@ -29,8 +28,7 @@ except ImportError as exc:
'You must install package "pycountry" to run this example.' 'You must install package "pycountry" to run this example.'
) from exc ) from exc
API_DATASET = 'fablabs-in-the-world' API_DATASET = 'fablabs@public-us'
API_NETLOC = 'datanova.laposte.fr'
ROWS = 100 ROWS = 100
@ -40,65 +38,31 @@ def _getlink(x):
def normalize(row): def normalize(row):
result = { result = {
** **row,
row,
'links': list(filter(None, map(_getlink, json.loads(row.get('links'))))), 'links': list(filter(None, map(_getlink, json.loads(row.get('links'))))),
'country': pycountry.countries.get(alpha_2=row.get('country_code', '').upper()).name, 'country': pycountry.countries.get(alpha_2=row.get('country_code', '').upper()).name,
} }
return result return result
def display(row): def get_graph(graph=None, *, _limit=(), _print=()):
print(Style.BRIGHT, row.get('name'), Style.RESET_ALL, sep='') graph = graph or bonobo.Graph()
graph.add_chain(
OpenDataSoftAPI(dataset=API_DATASET),
*_limit,
normalize,
bonobo.UnpackItems(0),
*_print,
bonobo.JsonWriter(path='fablabs.json'),
)
return graph
address = list(
filter(
None, (
' '.join(
filter(
None, (
row.get('postal_code', None),
row.get('city', None)
)
)
),
row.get('county', None),
row.get('country'),
)
)
)
print(
' - {}address{}: {address}'.format(
Fore.BLUE, Style.RESET_ALL, address=', '.join(address)
)
)
print(
' - {}links{}: {links}'.format(
Fore.BLUE, Style.RESET_ALL, links=', '.join(row['links'])
)
)
print(
' - {}geometry{}: {geometry}'.format(
Fore.BLUE, Style.RESET_ALL, **row
)
)
print(
' - {}source{}: {source}'.format(
Fore.BLUE, Style.RESET_ALL, source='datanova/' + API_DATASET
)
)
graph = bonobo.Graph(
OpenDataSoftAPI(
dataset=API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris'
),
normalize,
bonobo.Filter(filter=lambda row: row.get('country') == 'France'),
bonobo.JsonWriter(path='fablabs.txt', ioformat='arg0'),
bonobo.Tee(display),
)
if __name__ == '__main__': if __name__ == '__main__':
bonobo.run(graph, services=get_default_services(__file__)) parser = examples.get_argument_parser()
with bonobo.parse_args(parser) as options:
bonobo.run(
get_graph(**examples.get_graph_options(options)),
services=get_services()
)

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,20 @@
import os
import bonobo
def get_minor_version():
return '.'.join(bonobo.__version__.split('.')[:2])
def get_datasets_dir(*dirs):
home_dir = os.path.expanduser('~')
target_dir = os.path.join(
home_dir, '.cache/bonobo', get_minor_version(), *dirs
)
os.makedirs(target_dir, exist_ok=True)
return target_dir
def get_services():
return {'fs': bonobo.open_fs(get_datasets_dir('datasets'))}

View File

@ -1,20 +0,0 @@
import os
import bonobo
def extract():
env_test_user = os.getenv('ENV_TEST_USER')
env_test_number = os.getenv('ENV_TEST_NUMBER')
env_test_string = os.getenv('ENV_TEST_STRING')
return env_test_user, env_test_number, env_test_string
def load(s: str):
print(s)
graph = bonobo.Graph(extract, load)
if __name__ == '__main__':
bonobo.run(graph)

View File

@ -0,0 +1,27 @@
"""
This transformation extracts the environment and prints it, sorted alphabetically, one item per line.
Used in the bonobo tests around environment management.
"""
import os
import bonobo
def extract_environ():
"""Yield all the system environment."""
yield from sorted(os.environ.items())
def get_graph():
graph = bonobo.Graph()
graph.add_chain(extract_environ, print)
return graph
if __name__ == '__main__':
parser = bonobo.get_argument_parser()
with bonobo.parse_args(parser):
bonobo.run(get_graph())

View File

@ -2,4 +2,7 @@ from bonobo import get_examples_path, open_fs
def get_services(): def get_services():
return {'fs': open_fs(get_examples_path())} return {
'fs': open_fs(get_examples_path()),
'fs.output': open_fs(),
}

View File

@ -1,10 +1,36 @@
import bonobo import bonobo
from bonobo.commands.run import get_default_services from bonobo.examples.files._services import get_services
def get_graph(*, _limit=None, _print=False):
return bonobo.Graph(
bonobo.CsvReader('datasets/coffeeshops.txt'),
*((bonobo.Limit(_limit), ) if _limit else ()),
*((bonobo.PrettyPrinter(), ) if _print else ()),
bonobo.CsvWriter('coffeeshops.csv', fs='fs.output')
)
graph = bonobo.Graph(
bonobo.CsvReader('datasets/coffeeshops.txt', headers=('item', )),
bonobo.PrettyPrinter(),
)
if __name__ == '__main__': if __name__ == '__main__':
bonobo.run(graph, services=get_default_services(__file__)) parser = bonobo.get_argument_parser()
parser.add_argument(
'--limit',
'-l',
type=int,
default=None,
help='If set, limits the number of processed lines.'
)
parser.add_argument(
'--print',
'-p',
action='store_true',
default=False,
help='If set, pretty prints before writing to output file.'
)
with bonobo.parse_args(parser) as options:
bonobo.run(
get_graph(_limit=options['limit'], _print=options['print']),
services=get_services()
)

View File

@ -1,17 +1,50 @@
import bonobo import bonobo
from bonobo import Bag from bonobo.examples.files._services import get_services
from bonobo.commands.run import get_default_services
def get_fields(**row): def get_graph(*, _limit=None, _print=False):
return Bag(**row['fields']) graph = bonobo.Graph()
trunk = graph.add_chain(
bonobo.JsonReader('datasets/theaters.json'),
*((bonobo.Limit(_limit), ) if _limit else ()),
)
if _print:
graph.add_chain(bonobo.PrettyPrinter(), _input=trunk.output)
graph.add_chain(
bonobo.JsonWriter('theaters.json', fs='fs.output'),
_input=trunk.output
)
graph.add_chain(
bonobo.LdjsonWriter('theaters.ldjson', fs='fs.output'),
_input=trunk.output
)
return graph
graph = bonobo.Graph(
bonobo.JsonReader('datasets/theaters.json'),
get_fields,
bonobo.PrettyPrinter(),
)
if __name__ == '__main__': if __name__ == '__main__':
bonobo.run(graph, services=get_default_services(__file__)) parser = bonobo.get_argument_parser()
parser.add_argument(
'--limit',
'-l',
type=int,
default=None,
help='If set, limits the number of processed lines.'
)
parser.add_argument(
'--print',
'-p',
action='store_true',
default=False,
help='If set, pretty prints before writing to output file.'
)
with bonobo.parse_args(parser) as options:
bonobo.run(
get_graph(_limit=options['limit'], _print=options['print']),
services=get_services()
)

View File

@ -27,33 +27,51 @@ messages categorized as spam, and (3) prints the output.
''' '''
import bonobo
from bonobo.commands.run import get_default_services
from fs.tarfs import TarFS from fs.tarfs import TarFS
import bonobo
from bonobo import examples
def cleanse_sms(**row):
if row['category'] == 'spam': def cleanse_sms(category, sms):
row['sms_clean'] = '**MARKED AS SPAM** ' + row['sms'][0:50] + ( if category == 'spam':
'...' if len(row['sms']) > 50 else '' sms_clean = '**MARKED AS SPAM** ' + sms[0:50] + (
'...' if len(sms) > 50 else ''
) )
elif category == 'ham':
sms_clean = sms
else: else:
row['sms_clean'] = row['sms'] raise ValueError('Unknown category {!r}.'.format(category))
return row['sms_clean'] return category, sms, sms_clean
graph = bonobo.Graph( def get_graph(*, _limit=(), _print=()):
# spam.pkl is within the gzipped tarball graph = bonobo.Graph()
bonobo.PickleReader('spam.pkl'),
cleanse_sms, graph.add_chain(
bonobo.PrettyPrinter(), # spam.pkl is within the gzipped tarball
) bonobo.PickleReader('spam.pkl'),
*_limit,
cleanse_sms,
*_print,
)
return graph
def get_services(): def get_services():
return {'fs': TarFS(bonobo.get_examples_path('datasets/spam.tgz'))} from ._services import get_services
return {
**get_services(),
'fs': TarFS(bonobo.get_examples_path('datasets/spam.tgz'))
}
if __name__ == '__main__': if __name__ == '__main__':
bonobo.run(graph, services=get_default_services(__file__)) parser = examples.get_argument_parser()
with bonobo.parse_args(parser) as options:
bonobo.run(
get_graph(**examples.get_graph_options(options)),
services=get_services()
)

View File

@ -1,19 +1,29 @@
import bonobo import bonobo
from bonobo.commands.run import get_default_services from bonobo import examples
from bonobo.examples.files._services import get_services
def skip_comments(line): def skip_comments(line):
line = line.strip()
if not line.startswith('#'): if not line.startswith('#'):
yield line yield line
graph = bonobo.Graph( def get_graph(*, _limit=(), _print=()):
bonobo.FileReader('datasets/passwd.txt'), return bonobo.Graph(
skip_comments, bonobo.FileReader('datasets/passwd.txt'),
lambda s: s.split(':'), skip_comments,
lambda l: l[0], *_limit,
print, lambda s: s.split(':')[0],
) *_print,
bonobo.FileWriter('usernames.txt', fs='fs.output'),
)
if __name__ == '__main__': if __name__ == '__main__':
bonobo.run(graph, services=get_default_services(__file__)) parser = examples.get_argument_parser()
with bonobo.parse_args(parser) as options:
bonobo.run(
get_graph(**examples.get_graph_options(options)),
services=get_services()
)

View File

@ -1,5 +0,0 @@
from bonobo import get_examples_path, open_fs
def get_services():
return {'fs': open_fs(get_examples_path())}

View File

@ -1,41 +0,0 @@
"""
Example on how to use :class:`bonobo.Bag` instances to pass flexible args/kwargs to the next callable.
.. graphviz::
digraph {
rankdir = LR;
stylesheet = "../_static/graphs.css";
BEGIN [shape="point"];
BEGIN -> "extract()" -> "transform(...)" -> "load(...)";
}
"""
from random import randint
from bonobo import Bag, Graph
def extract():
yield Bag(topic='foo')
yield Bag(topic='bar')
yield Bag(topic='baz')
def transform(topic: str):
return Bag.inherit(title=topic.title(), rand=randint(10, 99))
def load(topic: str, title: str, rand: int):
print('{} ({}) wait={}'.format(title, topic, rand))
graph = Graph()
graph.add_chain(extract, transform, load)
if __name__ == '__main__':
from bonobo import run
run(graph)

View File

@ -1,21 +0,0 @@
"""
Simple example of :func:`bonobo.count` usage.
.. graphviz::
digraph {
rankdir = LR;
stylesheet = "../_static/graphs.css";
BEGIN [shape="point"];
BEGIN -> "range()" -> "count" -> "print";
}
"""
import bonobo
graph = bonobo.Graph(range(42), bonobo.count, print)
if __name__ == '__main__':
bonobo.run(graph)

View File

@ -1,43 +0,0 @@
"""
Example on how to use symple python dictionaries to communicate between transformations.
.. graphviz::
digraph {
rankdir = LR;
stylesheet = "../_static/graphs.css";
BEGIN [shape="point"];
BEGIN -> "extract()" -> "transform(row: dict)" -> "load(row: dict)";
}
"""
from random import randint
from bonobo import Graph
def extract():
yield {'topic': 'foo'}
yield {'topic': 'bar'}
yield {'topic': 'baz'}
def transform(row: dict):
return {
'topic': row['topic'].title(),
'randint': randint(10, 99),
}
def load(row: dict):
print(row)
graph = Graph(extract, transform, load)
if __name__ == '__main__':
from bonobo import run
run(graph)

View File

@ -1,18 +0,0 @@
import bonobo
from bonobo.commands.run import get_default_services
from bonobo.nodes.factory import Factory
from bonobo.nodes.io.json import JsonDictItemsReader
normalize = Factory()
normalize[0].str().title()
normalize.move(0, 'title')
normalize.move(0, 'address')
graph = bonobo.Graph(
JsonDictItemsReader('datasets/coffeeshops.json'),
normalize,
bonobo.PrettyPrinter(),
)
if __name__ == '__main__':
bonobo.run(graph, services=get_default_services(__file__))

View File

@ -1,24 +0,0 @@
import bonobo
from bonobo import Filter
class OddOnlyFilter(Filter):
def filter(self, i):
return i % 2
@Filter
def multiples_of_three(i):
return not (i % 3)
graph = bonobo.Graph(
lambda: tuple(range(50)),
OddOnlyFilter(),
multiples_of_three,
print,
)
if __name__ == '__main__':
bonobo.run(graph)

View File

@ -1,19 +0,0 @@
import bonobo
import time
from bonobo.constants import NOT_MODIFIED
def pause(*args, **kwargs):
time.sleep(0.1)
return NOT_MODIFIED
graph = bonobo.Graph(
lambda: tuple(range(20)),
pause,
print,
)
if __name__ == '__main__':
bonobo.run(graph)

View File

@ -1,39 +0,0 @@
"""
Example on how to use symple python strings to communicate between transformations.
.. graphviz::
digraph {
rankdir = LR;
stylesheet = "../_static/graphs.css";
BEGIN [shape="point"];
BEGIN -> "extract()" -> "transform(s: str)" -> "load(s: str)";
}
"""
from random import randint
from bonobo import Graph
def extract():
yield 'foo'
yield 'bar'
yield 'baz'
def transform(s: str):
return '{} ({})'.format(s.title(), randint(10, 99))
def load(s: str):
print(s)
graph = Graph(extract, transform, load)
if __name__ == '__main__':
from bonobo import run
run(graph)

View File

@ -2,15 +2,13 @@ import bonobo
def split_one(line): def split_one(line):
return line.split(', ', 1) return dict(zip(("name", "address"), line.split(', ', 1)))
graph = bonobo.Graph( graph = bonobo.Graph(
bonobo.FileReader('coffeeshops.txt'), bonobo.FileReader('coffeeshops.txt'),
split_one, split_one,
bonobo.JsonWriter( bonobo.JsonWriter('coffeeshops.json', fs='fs.output'),
'coffeeshops.json', fs='fs.output', ioformat='arg0'
),
) )

View File

@ -11,16 +11,17 @@ def split_one_to_map(line):
class MyJsonWriter(bonobo.JsonWriter): class MyJsonWriter(bonobo.JsonWriter):
prefix, suffix = '{', '}' prefix, suffix = '{', '}'
def write(self, fs, file, lineno, row): def write(self, fs, file, lineno, **row):
return bonobo.FileWriter.write( return bonobo.FileWriter.write(
self, fs, file, lineno, json.dumps(row)[1:-1] self, fs, file, lineno,
json.dumps(row)[1:-1]
) )
graph = bonobo.Graph( graph = bonobo.Graph(
bonobo.FileReader('coffeeshops.txt'), bonobo.FileReader('coffeeshops.txt'),
split_one_to_map, split_one_to_map,
MyJsonWriter('coffeeshops.json', fs='fs.output', ioformat='arg0'), MyJsonWriter('coffeeshops.json', fs='fs.output'),
) )

View File

@ -1,7 +0,0 @@
from . import bags, dicts, strings
__all__ = [
'bags',
'dicts',
'strings',
]

View File

@ -1,3 +1,7 @@
from bonobo.util.python import require import bonobo
from bonobo.examples.types.strings import get_graph
graph = require('strings').graph if __name__ == '__main__':
parser = bonobo.get_argument_parser()
with bonobo.parse_args(parser):
bonobo.run(get_graph())

View File

@ -1,41 +0,0 @@
"""
Example on how to use :class:`bonobo.Bag` instances to pass flexible args/kwargs to the next callable.
.. graphviz::
digraph {
rankdir = LR;
stylesheet = "../_static/graphs.css";
BEGIN [shape="point"];
BEGIN -> "extract()" -> "transform(...)" -> "load(...)";
}
"""
from random import randint
from bonobo import Bag, Graph
def extract():
yield Bag(topic='foo')
yield Bag(topic='bar')
yield Bag(topic='baz')
def transform(topic: str):
return Bag.inherit(title=topic.title(), rand=randint(10, 99))
def load(topic: str, title: str, rand: int):
print('{} ({}) wait={}'.format(title, topic, rand))
graph = Graph()
graph.add_chain(extract, transform, load)
if __name__ == '__main__':
from bonobo import run
run(graph)

View File

@ -1,43 +0,0 @@
"""
Example on how to use symple python dictionaries to communicate between transformations.
.. graphviz::
digraph {
rankdir = LR;
stylesheet = "../_static/graphs.css";
BEGIN [shape="point"];
BEGIN -> "extract()" -> "transform(row: dict)" -> "load(row: dict)";
}
"""
from random import randint
from bonobo import Graph
def extract():
yield {'topic': 'foo'}
yield {'topic': 'bar'}
yield {'topic': 'baz'}
def transform(row: dict):
return {
'topic': row['topic'].title(),
'randint': randint(10, 99),
}
def load(row: dict):
print(row)
graph = Graph(extract, transform, load)
if __name__ == '__main__':
from bonobo import run
run(graph)

View File

@ -14,7 +14,7 @@ Example on how to use symple python strings to communicate between transformatio
""" """
from random import randint from random import randint
from bonobo import Graph import bonobo
def extract(): def extract():
@ -23,17 +23,19 @@ def extract():
yield 'baz' yield 'baz'
def transform(s: str): def transform(s):
return '{} ({})'.format(s.title(), randint(10, 99)) return '{} ({})'.format(s.title(), randint(10, 99))
def load(s: str): def load(s):
print(s) print(s)
graph = Graph(extract, transform, load) def get_graph():
return bonobo.Graph(extract, transform, load)
if __name__ == '__main__': if __name__ == '__main__':
from bonobo import run parser = bonobo.get_argument_parser()
with bonobo.parse_args(parser):
run(graph) bonobo.run(get_graph())

View File

@ -1 +1,5 @@
from bonobo.execution.graph import GraphExecutionContext, NodeExecutionContext, PluginExecutionContext import logging
logger = logging.getLogger(__name__)
__all__ = []

View File

@ -1,112 +0,0 @@
import traceback
from contextlib import contextmanager
from time import sleep
from bonobo.config import create_container
from bonobo.config.processors import ContextCurrifier
from bonobo.plugins import get_enhancers
from bonobo.util.errors import print_error
from bonobo.util.objects import Wrapper, get_name
@contextmanager
def recoverable(error_handler):
try:
yield
except Exception as exc: # pylint: disable=broad-except
error_handler(exc, traceback.format_exc())
@contextmanager
def unrecoverable(error_handler):
try:
yield
except Exception as exc: # pylint: disable=broad-except
error_handler(exc, traceback.format_exc())
raise # raise unrecoverableerror from x ?
class LoopingExecutionContext(Wrapper):
alive = True
PERIOD = 0.25
@property
def started(self):
return self._started
@property
def stopped(self):
return self._stopped
def __init__(self, wrapped, parent, services=None):
super().__init__(wrapped)
self.parent = parent
if services:
if parent:
raise RuntimeError(
'Having services defined both in GraphExecutionContext and child NodeExecutionContext is not supported, for now.'
)
self.services = create_container(services)
else:
self.services = None
self._started, self._stopped = False, False
self._stack = None
# XXX enhancers
self._enhancers = get_enhancers(self.wrapped)
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type=None, exc_val=None, exc_tb=None):
self.stop()
def start(self):
if self.started:
raise RuntimeError('Cannot start a node twice ({}).'.format(get_name(self)))
self._started = True
self._stack = ContextCurrifier(self.wrapped, *self._get_initial_context())
self._stack.setup(self)
for enhancer in self._enhancers:
with unrecoverable(self.handle_error):
enhancer.start(self)
def loop(self):
"""Generic loop. A bit boring. """
while self.alive:
self.step()
sleep(self.PERIOD)
def step(self):
"""Left as an exercise for the children."""
raise NotImplementedError('Abstract.')
def stop(self):
if not self.started:
raise RuntimeError('Cannot stop an unstarted node ({}).'.format(get_name(self)))
if self._stopped:
return
try:
if self._stack:
self._stack.teardown()
finally:
self._stopped = True
def handle_error(self, exc, trace):
return print_error(exc, trace, context=self.wrapped)
def _get_initial_context(self):
if self.parent:
return self.parent.services.args_for(self.wrapped)
if self.services:
return self.services.args_for(self.wrapped)
return ()

View File

@ -0,0 +1,9 @@
from bonobo.execution.contexts.graph import GraphExecutionContext
from bonobo.execution.contexts.node import NodeExecutionContext
from bonobo.execution.contexts.plugin import PluginExecutionContext
__all__ = [
'GraphExecutionContext',
'NodeExecutionContext',
'PluginExecutionContext',
]

View File

@ -0,0 +1,136 @@
import logging
import sys
from contextlib import contextmanager
from logging import ERROR
from mondrian import term
from bonobo.util import deprecated
from bonobo.util.objects import Wrapper, get_name
@contextmanager
def recoverable(error_handler):
try:
yield
except Exception as exc: # pylint: disable=broad-except
error_handler(*sys.exc_info(), level=ERROR)
@contextmanager
def unrecoverable(error_handler):
try:
yield
except Exception as exc: # pylint: disable=broad-except
error_handler(*sys.exc_info(), level=ERROR)
raise # raise unrecoverableerror from x ?
class Lifecycle:
def __init__(self):
self._started = False
self._stopped = False
self._killed = False
self._defunct = False
@property
def started(self):
return self._started
@property
def stopped(self):
return self._stopped
@property
def killed(self):
return self._killed
@property
def defunct(self):
return self._defunct
@property
def alive(self):
return self._started and not self._stopped
@property
def should_loop(self):
# TODO XXX started/stopped?
return not any((self.defunct, self.killed))
@property
def status(self):
"""One character status for this node. """
if self._defunct:
return '!'
if not self.started:
return ' '
if not self.stopped:
return '+'
return '-'
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type=None, exc_val=None, exc_tb=None):
self.stop()
def get_flags_as_string(self):
if self._defunct:
return term.red('[defunct]')
if self.killed:
return term.lightred('[killed]')
if self.stopped:
return term.lightblack('[done]')
return ''
def start(self):
if self.started:
raise RuntimeError('This context is already started ({}).'.format(get_name(self)))
self._started = True
def stop(self):
if not self.started:
raise RuntimeError('This context cannot be stopped as it never started ({}).'.format(get_name(self)))
self._stopped = True
if self._stopped: # Stopping twice has no effect
return
def kill(self):
if not self.started:
raise RuntimeError('Cannot kill an unstarted context.')
if self.stopped:
raise RuntimeError('Cannot kill a stopped context.')
self._killed = True
@deprecated
def handle_error(self, exctype, exc, tb, *, level=logging.ERROR):
return self.error((exctype, exc, tb), level=level)
def error(self, exc_info, *, level=logging.ERROR):
logging.getLogger(__name__).log(level, repr(self), exc_info=exc_info)
def fatal(self, exc_info, *, level=logging.CRITICAL):
logging.getLogger(__name__).log(level, repr(self), exc_info=exc_info)
self._defunct = True
def as_dict(self):
return {
'status': self.status,
'name': self.name,
'stats': self.get_statistics_as_string(),
'flags': self.get_flags_as_string(),
}
class BaseContext(Lifecycle, Wrapper):
def __init__(self, wrapped, *, parent=None):
Lifecycle.__init__(self)
Wrapper.__init__(self, wrapped)
self.parent = parent

View File

@ -0,0 +1,115 @@
from functools import partial
from time import sleep
from bonobo.config import create_container
from bonobo.constants import BEGIN, END
from bonobo.execution import events
from bonobo.execution.contexts.node import NodeExecutionContext
from bonobo.execution.contexts.plugin import PluginExecutionContext
from whistle import EventDispatcher
class GraphExecutionContext:
NodeExecutionContextType = NodeExecutionContext
PluginExecutionContextType = PluginExecutionContext
TICK_PERIOD = 0.25
@property
def started(self):
return any(node.started for node in self.nodes)
@property
def stopped(self):
return all(node.started and node.stopped for node in self.nodes)
@property
def alive(self):
return any(node.alive for node in self.nodes)
def __init__(self, graph, plugins=None, services=None, dispatcher=None):
self.dispatcher = dispatcher or EventDispatcher()
self.graph = graph
self.nodes = [self.create_node_execution_context_for(node) for node in self.graph]
self.plugins = [self.create_plugin_execution_context_for(plugin) for plugin in plugins or ()]
self.services = create_container(services)
# Probably not a good idea to use it unless you really know what you're doing. But you can access the context.
self.services['__graph_context'] = self
for i, node_context in enumerate(self):
outputs = self.graph.outputs_of(i)
if len(outputs):
node_context.outputs = [self[j].input for j in outputs]
node_context.input.on_begin = partial(node_context._send, BEGIN, _control=True)
node_context.input.on_end = partial(node_context._send, END, _control=True)
node_context.input.on_finalize = partial(node_context.stop)
def __getitem__(self, item):
return self.nodes[item]
def __len__(self):
return len(self.nodes)
def __iter__(self):
yield from self.nodes
def create_node_execution_context_for(self, node):
return self.NodeExecutionContextType(node, parent=self)
def create_plugin_execution_context_for(self, plugin):
if isinstance(plugin, type):
plugin = plugin()
return self.PluginExecutionContextType(plugin, parent=self)
def write(self, *messages):
"""Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in
our graph."""
for i in self.graph.outputs_of(BEGIN):
for message in messages:
self[i].write(message)
def dispatch(self, name):
self.dispatcher.dispatch(name, events.ExecutionEvent(self))
def start(self, starter=None):
self.register_plugins()
self.dispatch(events.START)
self.tick(pause=False)
for node in self.nodes:
if starter is None:
node.start()
else:
starter(node)
self.dispatch(events.STARTED)
def tick(self, pause=True):
self.dispatch(events.TICK)
if pause:
sleep(self.TICK_PERIOD)
def kill(self):
self.dispatch(events.KILL)
for node_context in self.nodes:
node_context.kill()
self.tick()
def stop(self, stopper=None):
self.dispatch(events.STOP)
for node_context in self.nodes:
if stopper is None:
node_context.stop()
else:
stopper(node_context)
self.tick(pause=False)
self.dispatch(events.STOPPED)
self.unregister_plugins()
def register_plugins(self):
for plugin_context in self.plugins:
plugin_context.register()
def unregister_plugins(self):
for plugin_context in self.plugins:
plugin_context.unregister()

View File

@ -0,0 +1,377 @@
import logging
import sys
from collections import namedtuple
from queue import Empty
from time import sleep
from types import GeneratorType
from bonobo.config import create_container
from bonobo.config.processors import ContextCurrifier
from bonobo.constants import NOT_MODIFIED, BEGIN, END, TICK_PERIOD, Token, Flag, INHERIT
from bonobo.errors import InactiveReadableError, UnrecoverableError, UnrecoverableTypeError
from bonobo.execution.contexts.base import BaseContext
from bonobo.structs.inputs import Input
from bonobo.util import get_name, isconfigurabletype, ensure_tuple
from bonobo.util.bags import BagType
from bonobo.util.statistics import WithStatistics
logger = logging.getLogger(__name__)
UnboundArguments = namedtuple('UnboundArguments', ['args', 'kwargs'])
class NodeExecutionContext(BaseContext, WithStatistics):
def __init__(self, wrapped, *, parent=None, services=None, _input=None, _outputs=None):
"""
Node execution context has the responsibility fo storing the state of a transformation during its execution.
:param wrapped: wrapped transformation
:param parent: parent context, most probably a graph context
:param services: dict-like collection of services
:param _input: input queue (optional)
:param _outputs: output queues (optional)
"""
BaseContext.__init__(self, wrapped, parent=parent)
WithStatistics.__init__(self, 'in', 'out', 'err', 'warn')
# Services: how we'll access external dependencies
if services:
if self.parent:
raise RuntimeError(
'Having services defined both in GraphExecutionContext and child NodeExecutionContext is not supported, for now.'
)
self.services = create_container(services)
else:
self.services = None
# Input / Output: how the wrapped node will communicate
self.input = _input or Input()
self.outputs = _outputs or []
# Types
self._input_type, self._input_length = None, None
self._output_type = None
# Stack: context decorators for the execution
self._stack = None
def __str__(self):
return self.__name__ + self.get_statistics_as_string(prefix=' ')
def __repr__(self):
name, type_name = get_name(self), get_name(type(self))
return '<{}({}{}){}>'.format(type_name, self.status, name, self.get_statistics_as_string(prefix=' '))
def start(self):
"""
Starts this context, a.k.a the phase where you setup everything which will be necessary during the whole
lifetime of a transformation.
The "ContextCurrifier" is in charge of setting up a decorating stack, that includes both services and context
processors, and will call the actual node callable with additional parameters.
"""
super().start()
try:
initial = self._get_initial_context()
self._stack = ContextCurrifier(self.wrapped, *initial.args, **initial.kwargs)
if isconfigurabletype(self.wrapped):
# Not normal to have a partially configured object here, so let's warn the user instead of having get into
# the hard trouble of understanding that by himself.
raise TypeError(
'Configurables should be instanciated before execution starts.\nGot {!r}.\n'.format(self.wrapped)
)
self._stack.setup(self)
except Exception:
# Set the logging level to the lowest possible, to avoid double log.
self.fatal(sys.exc_info(), level=0)
# We raise again, so the error is not ignored out of execution loops.
raise
def loop(self):
"""
The actual infinite loop for this transformation.
"""
logger.debug('Node loop starts for {!r}.'.format(self))
while self.should_loop:
try:
self.step()
except InactiveReadableError:
break
except Empty:
sleep(TICK_PERIOD) # XXX: How do we determine this constant?
continue
except (
NotImplementedError,
UnrecoverableError,
):
self.fatal(sys.exc_info()) # exit loop
except Exception: # pylint: disable=broad-except
self.error(sys.exc_info()) # does not exit loop
except BaseException:
self.fatal(sys.exc_info()) # exit loop
logger.debug('Node loop ends for {!r}.'.format(self))
def step(self):
"""
A single step in the loop.
Basically gets an input bag, send it to the node, interpret the results.
"""
# Pull and check data
input_bag = self._get()
# Sent through the stack
results = self._stack(input_bag)
# self._exec_time += timer.duration
# Put data onto output channels
if isinstance(results, GeneratorType):
while True:
try:
# if kill flag was step, stop iterating.
if self._killed:
break
result = next(results)
except StopIteration:
# That's not an error, we're just done.
break
else:
# Push data (in case of an iterator)
self._send(self._cast(input_bag, result))
elif results:
# Push data (returned value)
self._send(self._cast(input_bag, results))
else:
# case with no result, an execution went through anyway, use for stats.
# self._exec_count += 1
pass
def stop(self):
"""
Cleanup the context, after the loop ended.
"""
if self._stack:
try:
self._stack.teardown()
except:
self.fatal(sys.exc_info())
super().stop()
def send(self, *_output, _input=None):
return self._send(self._cast(_input, _output))
### Input type and fields
@property
def input_type(self):
return self._input_type
def set_input_type(self, input_type):
if self._input_type is not None:
raise RuntimeError('Cannot override input type, already have %r.', self._input_type)
if type(input_type) is not type:
raise UnrecoverableTypeError('Input types must be regular python types.')
if not issubclass(input_type, tuple):
raise UnrecoverableTypeError('Input types must be subclasses of tuple (and act as tuples).')
self._input_type = input_type
def get_input_fields(self):
return self._input_type._fields if self._input_type and hasattr(self._input_type, '_fields') else None
def set_input_fields(self, fields, typename='Bag'):
self.set_input_type(BagType(typename, fields))
### Output type and fields
@property
def output_type(self):
return self._output_type
def set_output_type(self, output_type):
if self._output_type is not None:
raise RuntimeError('Cannot override output type, already have %r.', self._output_type)
if type(output_type) is not type:
raise UnrecoverableTypeError('Output types must be regular python types.')
if not issubclass(output_type, tuple):
raise UnrecoverableTypeError('Output types must be subclasses of tuple (and act as tuples).')
self._output_type = output_type
def get_output_fields(self):
return self._output_type._fields if self._output_type and hasattr(self._output_type, '_fields') else None
def set_output_fields(self, fields, typename='Bag'):
self.set_output_type(BagType(typename, fields))
### Attributes
def setdefault(self, attr, value):
try:
getattr(self, attr)
except AttributeError:
setattr(self, attr, value)
def write(self, *messages):
"""
Push a message list to this context's input queue.
:param mixed value: message
"""
for message in messages:
if isinstance(message, Token):
self.input.put(message)
elif self._input_type:
self.input.put(ensure_tuple(message, cls=self._input_type))
else:
self.input.put(ensure_tuple(message))
def write_sync(self, *messages):
self.write(BEGIN, *messages, END)
for _ in messages:
self.step()
def error(self, exc_info, *, level=logging.ERROR):
self.increment('err')
super().error(exc_info, level=level)
def fatal(self, exc_info, *, level=logging.CRITICAL):
self.increment('err')
super().fatal(exc_info, level=level)
self.input.shutdown()
def get_service(self, name):
if self.parent:
return self.parent.services.get(name)
return self.services.get(name)
def _get(self):
"""
Read from the input queue.
If Queue raises (like Timeout or Empty), stat won't be changed.
"""
input_bag = self.input.get()
# Store or check input type
if self._input_type is None:
self._input_type = type(input_bag)
elif type(input_bag) is not self._input_type:
raise UnrecoverableTypeError(
'Input type changed between calls to {!r}.\nGot {!r} which is not of type {!r}.'.format(
self.wrapped, input_bag, self._input_type
)
)
# Store or check input length, which is a soft fallback in case we're just using tuples
if self._input_length is None:
self._input_length = len(input_bag)
elif len(input_bag) != self._input_length:
raise UnrecoverableTypeError(
'Input length changed between calls to {!r}.\nExpected {} but got {}: {!r}.'.format(
self.wrapped, self._input_length, len(input_bag), input_bag
)
)
self.increment('in') # XXX should that go before type check ?
return input_bag
def _cast(self, _input, _output):
"""
Transforms a pair of input/output into the real slim output.
:param _input: Bag
:param _output: mixed
:return: Bag
"""
tokens, _output = split_token(_output)
if NOT_MODIFIED in tokens:
return ensure_tuple(_input, cls=(self.output_type or tuple))
if INHERIT in tokens:
if self._output_type is None:
self._output_type = concat_types(self._input_type, self._input_length, self._output_type, len(_output))
_output = _input + ensure_tuple(_output)
return ensure_tuple(_output, cls=(self._output_type or tuple))
def _send(self, value, _control=False):
"""
Sends a message to all of this context's outputs.
:param mixed value: message
:param _control: if true, won't count in statistics.
"""
if not _control:
self.increment('out')
for output in self.outputs:
output.put(value)
def _get_initial_context(self):
if self.parent:
return UnboundArguments((), self.parent.services.kwargs_for(self.wrapped))
if self.services:
return UnboundArguments((), self.services.kwargs_for(self.wrapped))
return UnboundArguments((), {})
def isflag(param):
return isinstance(param, Flag)
def split_token(output):
"""
Split an output into token tuple, real output tuple.
:param output:
:return: tuple, tuple
"""
output = ensure_tuple(output)
flags, i, len_output, data_allowed = set(), 0, len(output), True
while i < len_output and isflag(output[i]):
if output[i].must_be_first and i:
raise ValueError('{} flag must be first.'.format(output[i]))
if i and output[i - 1].must_be_last:
raise ValueError('{} flag must be last.'.format(output[i - 1]))
if output[i] in flags:
raise ValueError('Duplicate flag {}.'.format(output[i]))
flags.add(output[i])
data_allowed &= output[i].allows_data
i += 1
output = output[i:]
if not data_allowed and len(output):
raise ValueError('Output data provided after a flag that does not allow data.')
return flags, output
def concat_types(t1, l1, t2, l2):
t1, t2 = t1 or tuple, t2 or tuple
if t1 == t2 == tuple:
return tuple
f1 = t1._fields if hasattr(t1, '_fields') else tuple(range(l1))
f2 = t2._fields if hasattr(t2, '_fields') else tuple(range(l2))
return BagType('Inherited', f1 + f2)

View File

@ -0,0 +1,13 @@
from bonobo.execution.contexts.base import BaseContext
class PluginExecutionContext(BaseContext):
@property
def dispatcher(self):
return self.parent.dispatcher
def register(self):
return self.wrapped.register(self.dispatcher)
def unregister(self):
return self.wrapped.unregister(self.dispatcher)

View File

@ -0,0 +1,13 @@
from whistle import Event
START = 'execution.start'
STARTED = 'execution.started'
TICK = 'execution.tick'
STOP = 'execution.stop'
STOPPED = 'execution.stopped'
KILL = 'execution.kill'
class ExecutionEvent(Event):
def __init__(self, context):
self.context = context

View File

@ -1,67 +0,0 @@
from functools import partial
from bonobo.config import create_container
from bonobo.constants import BEGIN, END
from bonobo.execution.node import NodeExecutionContext
from bonobo.execution.plugin import PluginExecutionContext
class GraphExecutionContext:
@property
def started(self):
return any(node.started for node in self.nodes)
@property
def stopped(self):
return all(node.started and node.stopped for node in self.nodes)
@property
def alive(self):
return any(node.alive for node in self.nodes)
def __init__(self, graph, plugins=None, services=None):
self.graph = graph
self.nodes = [NodeExecutionContext(node, parent=self) for node in self.graph]
self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()]
self.services = create_container(services)
# Probably not a good idea to use it unless you really know what you're doing. But you can access the context.
self.services['__graph_context'] = self
for i, node_context in enumerate(self):
node_context.outputs = [self[j].input for j in self.graph.outputs_of(i)]
node_context.input.on_begin = partial(node_context.send, BEGIN, _control=True)
node_context.input.on_end = partial(node_context.send, END, _control=True)
node_context.input.on_finalize = partial(node_context.stop)
def __getitem__(self, item):
return self.nodes[item]
def __len__(self):
return len(self.nodes)
def __iter__(self):
yield from self.nodes
def write(self, *messages):
"""Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in
our graph."""
for i in self.graph.outputs_of(BEGIN):
for message in messages:
self[i].write(message)
def start(self):
# todo use strategy
for node in self.nodes:
node.start()
def stop(self):
# todo use strategy
for node in self.nodes:
node.stop()
def loop(self):
# todo use strategy
for node in self.nodes:
node.loop()

Some files were not shown because too many files have changed in this diff Show More