Merge pull request #40 from hartym/0.2

Filesystem as a service and service configuration for directories/files (#37, #38).
This commit is contained in:
Romain Dorgueil
2017-04-27 22:55:14 -07:00
committed by GitHub
41 changed files with 481 additions and 320 deletions

View File

@ -1,7 +1,7 @@
# This file has been auto-generated. # This file has been auto-generated.
# All changes will be lost, see Projectfile. # All changes will be lost, see Projectfile.
# #
# Updated at 2017-04-25 23:05:05.062813 # Updated at 2017-04-28 06:33:29.712011
PYTHON ?= $(shell which python) PYTHON ?= $(shell which python)
PYTHON_BASENAME ?= $(shell basename $(PYTHON)) PYTHON_BASENAME ?= $(shell basename $(PYTHON))

View File

@ -21,10 +21,11 @@ enable_features = {
} }
install_requires = [ install_requires = [
'colorama >=0.3,<0.4', 'colorama ==0.3.9',
'psutil >=5.2,<5.3', 'fs ==2.0.3',
'requests >=2.13,<2.14', 'psutil ==5.2.2',
'stevedore >=1.19,<1.20', 'requests ==2.13.0',
'stevedore ==1.21.0',
] ]
extras_require = { extras_require = {
@ -33,8 +34,7 @@ extras_require = {
'ipywidgets >=6.0.0.beta5' 'ipywidgets >=6.0.0.beta5'
], ],
'dev': [ 'dev': [
'coverage >=4.3,<4.4', 'coverage >=4,<5',
'mock >=2.0,<2.1',
'pylint >=1,<2', 'pylint >=1,<2',
'pytest >=3,<4', 'pytest >=3,<4',
'pytest-cov >=2,<3', 'pytest-cov >=2,<3',

View File

@ -1,7 +1,7 @@
#! /bin/bash #! /bin/bash
__PATH__=$(cd $(dirname "$0")/..; pwd) __PATH__=$(cd $(dirname "$0")/..; pwd)
EXAMPLES=$(cd $__PATH__; find bonobo/examples -name \*.py -not -name __init__.py) EXAMPLES=$(cd $__PATH__; find bonobo/examples -name \*.py -not -name _\*)
for example in $EXAMPLES; do for example in $EXAMPLES; do
echo "===== $example =====" echo "===== $example ====="

View File

@ -7,113 +7,10 @@
"""Bonobo data-processing toolkit main module.""" """Bonobo data-processing toolkit main module."""
import sys import sys
import warnings
assert (sys.version_info >= (3, 5)), 'Python 3.5+ is required to use Bonobo.' assert (sys.version_info >= (3, 5)), 'Python 3.5+ is required to use Bonobo.'
from bonobo._api import *
from bonobo._api import __all__
from ._version import __version__ __all__ = __all__
from .basics import __all__ as __all_basics__
from .config import __all__ as __all_config__
from .execution import __all__ as __all_execution__
from .io import __all__ as __all_io__
from .strategies import __all__ as __all_strategies__
__all__ = __all_basics__ + __all_config__ + __all_execution__ + __all_io__ + __all_strategies__ + [
'Bag',
'ErrorBag'
'Graph',
'Token',
'__version__',
'create_strategy',
'get_examples_path',
'run',
]
from .basics import *
from .config import *
from .execution import *
from .io import *
from .strategies import *
from .structs.bags import *
from .structs.graphs import *
from .structs.tokens import *
DEFAULT_STRATEGY = 'threadpool'
STRATEGIES = {
'naive': NaiveStrategy,
'processpool': ProcessPoolExecutorStrategy,
'threadpool': ThreadPoolExecutorStrategy,
}
def get_examples_path(*pathsegments):
import os
import pathlib
return str(pathlib.Path(os.path.dirname(__file__), 'examples', *pathsegments))
def create_strategy(name=None):
"""
Create a strategy, or just returns it if it's already one.
:param name:
:return: Strategy
"""
from bonobo.strategies.base import Strategy
import logging
if isinstance(name, Strategy):
return name
if name is None:
name = DEFAULT_STRATEGY
logging.debug('Creating strategy {}...'.format(name))
try:
factory = STRATEGIES[name]
except KeyError as exc:
raise RuntimeError(
'Invalid strategy {}. Available choices: {}.'.format(repr(name), ', '.join(sorted(STRATEGIES.keys())))
) from exc
return factory()
def _is_interactive_console():
import sys
return sys.stdout.isatty()
def _is_jupyter_notebook():
try:
return get_ipython().__class__.__name__ == 'ZMQInteractiveShell'
except NameError:
return False
def run(graph, *chain, strategy=None, plugins=None, services=None):
if len(chain):
warnings.warn('DEPRECATED. You should pass a Graph instance instead of a chain.')
from bonobo import Graph
graph = Graph(graph, *chain)
strategy = create_strategy(strategy)
plugins = []
if _is_interactive_console():
from bonobo.ext.console import ConsoleOutputPlugin
if ConsoleOutputPlugin not in plugins:
plugins.append(ConsoleOutputPlugin)
if _is_jupyter_notebook():
from bonobo.ext.jupyter import JupyterOutputPlugin
if JupyterOutputPlugin not in plugins:
plugins.append(JupyterOutputPlugin)
return strategy.execute(graph, plugins=plugins, services=services)
del sys del sys
del warnings

80
bonobo/_api.py Normal file
View File

@ -0,0 +1,80 @@
from bonobo._version import __version__
__all__ = [
'__version__',
]
from bonobo.structs import Bag, Graph
__all__ += ['Bag', 'Graph']
# Filesystem. This is a shortcut from the excellent filesystem2 library, that we make available there for convenience.
from fs import open_fs as _open_fs
open_fs = lambda url, *args, **kwargs: _open_fs(str(url), *args, **kwargs)
__all__ += ['open_fs']
# Basic transformations.
from bonobo.basics import *
from bonobo.basics import __all__ as _all_basics
__all__ += _all_basics
# Execution strategies.
from bonobo.strategies import create_strategy
__all__ += ['create_strategy']
# Extract and loads from stdlib.
from bonobo.io import *
from bonobo.io import __all__ as _all_io
__all__ += _all_io
# XXX This may be belonging to the bonobo.examples package.
def get_examples_path(*pathsegments):
import os
import pathlib
return str(pathlib.Path(os.path.dirname(__file__), 'examples', *pathsegments))
__all__.append(get_examples_path.__name__)
def _is_interactive_console():
import sys
return sys.stdout.isatty()
def _is_jupyter_notebook():
try:
return get_ipython().__class__.__name__ == 'ZMQInteractiveShell'
except NameError:
return False
# @api
def run(graph, *chain, strategy=None, plugins=None, services=None):
if len(chain):
warnings.warn('DEPRECATED. You should pass a Graph instance instead of a chain.')
from bonobo import Graph
graph = Graph(graph, *chain)
strategy = create_strategy(strategy)
plugins = []
if _is_interactive_console():
from bonobo.ext.console import ConsoleOutputPlugin
if ConsoleOutputPlugin not in plugins:
plugins.append(ConsoleOutputPlugin)
if _is_jupyter_notebook():
from bonobo.ext.jupyter import JupyterOutputPlugin
if JupyterOutputPlugin not in plugins:
plugins.append(JupyterOutputPlugin)
return strategy.execute(graph, plugins=plugins, services=services)
__all__.append(run.__name__)

View File

@ -19,6 +19,7 @@ __all__ = [
'noop', 'noop',
] ]
def identity(x): def identity(x):
return x return x

View File

@ -1,6 +1,33 @@
import argparse import argparse
import os
import bonobo import bonobo
DEFAULT_SERVICES_FILENAME = '_services.py'
DEFAULT_SERVICES_ATTR = 'get_services'
def get_default_services(filename, services=None):
dirname = os.path.dirname(filename)
services_filename = os.path.join(dirname, DEFAULT_SERVICES_FILENAME)
if os.path.exists(services_filename):
with open(services_filename) as file:
code = compile(file.read(), services_filename, 'exec')
context = {
'__name__': '__bonobo__',
'__file__': services_filename,
}
try:
exec(code, context)
except Exception as exc:
raise
return {
**context[DEFAULT_SERVICES_ATTR](),
**(services or {}),
}
return services or {}
def execute(file, quiet=False): def execute(file, quiet=False):
with file: with file:
@ -32,8 +59,7 @@ def execute(file, quiet=False):
# todo if console and not quiet, then add the console plugin # todo if console and not quiet, then add the console plugin
# todo when better console plugin, add it if console and just disable display # todo when better console plugin, add it if console and just disable display
return bonobo.run(graph, plugins=[], services=get_default_services(file.name, context.get(DEFAULT_SERVICES_ATTR)() if DEFAULT_SERVICES_ATTR in context else None))
return bonobo.run(graph)
def register(parser): def register(parser):

View File

@ -1,11 +1,12 @@
from bonobo.config.configurables import Configurable from bonobo.config.configurables import Configurable
from bonobo.config.options import Option from bonobo.config.options import Option
from bonobo.config.services import Container, Service
from bonobo.config.processors import ContextProcessor from bonobo.config.processors import ContextProcessor
from bonobo.config.services import Container, Service
__all__ = [ __all__ = [
'Configurable', 'Configurable',
'Container', 'Container',
'ContextProcessor',
'Option', 'Option',
'Service', 'Service',
] ]

View File

@ -0,0 +1,9 @@
from os.path import dirname
import bonobo
def get_services():
return {
'fs': bonobo.open_fs(dirname(__file__))
}

View File

@ -1,16 +1,14 @@
from os.path import dirname, realpath, join
import bonobo import bonobo
from bonobo.commands.run import get_default_services
from bonobo.ext.opendatasoft import OpenDataSoftAPI from bonobo.ext.opendatasoft import OpenDataSoftAPI
OUTPUT_FILENAME = realpath(join(dirname(__file__), 'coffeeshops.txt')) filename = 'coffeeshops.txt'
graph = bonobo.Graph( graph = bonobo.Graph(
OpenDataSoftAPI(dataset='liste-des-cafes-a-un-euro', netloc='opendata.paris.fr'), OpenDataSoftAPI(dataset='liste-des-cafes-a-un-euro', netloc='opendata.paris.fr'),
lambda row: '{nom_du_cafe}, {adresse}, {arrondissement} Paris, France'.format(**row), lambda row: '{nom_du_cafe}, {adresse}, {arrondissement} Paris, France'.format(**row),
bonobo.FileWriter(path=OUTPUT_FILENAME), bonobo.FileWriter(path=filename),
) )
if __name__ == '__main__': if __name__ == '__main__':
bonobo.run(graph) bonobo.run(graph, services=get_default_services(__file__))
print('Import done, read {} for results.'.format(OUTPUT_FILENAME))

View File

@ -1,11 +1,11 @@
import json import json
import os
from bonobo import JsonWriter, Graph, get_examples_path
from bonobo.basics import Tee
from bonobo.ext.opendatasoft import OpenDataSoftAPI
from colorama import Fore, Style from colorama import Fore, Style
import bonobo
from bonobo.commands.run import get_default_services
from bonobo.ext.opendatasoft import OpenDataSoftAPI
try: try:
import pycountry import pycountry
except ImportError as exc: except ImportError as exc:
@ -15,8 +15,6 @@ API_DATASET = 'fablabs-in-the-world'
API_NETLOC = 'datanova.laposte.fr' API_NETLOC = 'datanova.laposte.fr'
ROWS = 100 ROWS = 100
__path__ = os.path.dirname(__file__)
def _getlink(x): def _getlink(x):
return x.get('url', None) return x.get('url', None)
@ -55,15 +53,13 @@ def display(row):
print(' - {}source{}: {source}'.format(Fore.BLUE, Style.RESET_ALL, source='datanova/' + API_DATASET)) print(' - {}source{}: {source}'.format(Fore.BLUE, Style.RESET_ALL, source='datanova/' + API_DATASET))
graph = Graph( graph = bonobo.Graph(
OpenDataSoftAPI(dataset=API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris'), OpenDataSoftAPI(dataset=API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris'),
normalize, normalize,
filter_france, filter_france,
Tee(display), bonobo.Tee(display),
JsonWriter(path=get_examples_path('datasets/fablabs.txt')), bonobo.JsonWriter(path='fablabs.txt'),
) )
if __name__ == '__main__': if __name__ == '__main__':
from bonobo import run bonobo.run(graph, services=get_default_services(__file__))
run(graph)

View File

@ -0,0 +1,13 @@
import bonobo
from bonobo.commands.run import get_default_services
# XXX does not work anymore because of filesystem service, can't read HTTP
url = 'https://data.toulouse-metropole.fr/explore/dataset/theatres-et-salles-de-spectacles/download?format=json&timezone=Europe/Berlin&use_labels_for_header=true'
graph = bonobo.Graph(
bonobo.JsonReader(path=url),
print
)
if __name__ == '__main__':
bonobo.run(graph, services=get_default_services(__file__))

View File

@ -0,0 +1,7 @@
from bonobo import get_examples_path, open_fs
def get_services():
return {
'fs': open_fs(get_examples_path())
}

View File

@ -1,11 +0,0 @@
from bonobo import CsvReader, Graph, get_examples_path
graph = Graph(
CsvReader(path=get_examples_path('datasets/coffeeshops.txt')),
print,
)
if __name__ == '__main__':
import bonobo
bonobo.run(graph)

View File

@ -0,0 +1,10 @@
import bonobo
from bonobo.commands.run import get_default_services
graph = bonobo.Graph(
bonobo.CsvReader(path='datasets/coffeeshops.txt'),
print,
)
if __name__ == '__main__':
bonobo.run(graph, services=get_default_services(__file__))

View File

@ -1,8 +0,0 @@
import bonobo as bb
url = 'https://data.toulouse-metropole.fr/explore/dataset/theatres-et-salles-de-spectacles/download?format=json&timezone=Europe/Berlin&use_labels_for_header=true'
graph = bb.Graph(bb.JsonReader(path=url), print)
if __name__ == '__main__':
bb.run(graph)

View File

@ -1,20 +0,0 @@
from bonobo import FileReader, Graph, get_examples_path
def skip_comments(line):
if not line.startswith('#'):
yield line
graph = Graph(
FileReader(path=get_examples_path('datasets/passwd.txt')),
skip_comments,
lambda s: s.split(':'),
lambda l: l[0],
print,
)
if __name__ == '__main__':
import bonobo
bonobo.run(graph)

View File

@ -0,0 +1,19 @@
import bonobo
from bonobo.commands.run import get_default_services
def skip_comments(line):
if not line.startswith('#'):
yield line
graph = bonobo.Graph(
bonobo.FileReader(path='datasets/passwd.txt'),
skip_comments,
lambda s: s.split(':'),
lambda l: l[0],
print,
)
if __name__ == '__main__':
bonobo.run(graph, services=get_default_services(__file__))

View File

@ -1,9 +1,17 @@
import bonobo import bonobo
from bonobo.commands.run import get_default_services
graph = bonobo.Graph( graph = bonobo.Graph(
bonobo.FileReader(path=bonobo.get_examples_path('datasets/coffeeshops.txt')), bonobo.FileReader(path='datasets/coffeeshops.txt'),
print, print,
) )
def get_services():
return {
'fs': bonobo.open_fs(bonobo.get_examples_path())
}
if __name__ == '__main__': if __name__ == '__main__':
bonobo.run(graph) bonobo.run(graph, services=get_default_services(__file__, get_services()))

View File

@ -1,9 +1,3 @@
from bonobo.execution.graph import GraphExecutionContext, NodeExecutionContext, PluginExecutionContext from bonobo.execution.graph import GraphExecutionContext, NodeExecutionContext, PluginExecutionContext
__all__ = [
'GraphExecutionContext',
'NodeExecutionContext',
'PluginExecutionContext',
]

View File

@ -2,6 +2,7 @@ import sys
import traceback import traceback
from time import sleep from time import sleep
from bonobo.config import Container
from bonobo.config.processors import resolve_processors from bonobo.config.processors import resolve_processors
from bonobo.util.iterators import ensure_tuple from bonobo.util.iterators import ensure_tuple
from bonobo.util.objects import Wrapper from bonobo.util.objects import Wrapper
@ -23,9 +24,17 @@ class LoopingExecutionContext(Wrapper):
def stopped(self): def stopped(self):
return self._stopped return self._stopped
def __init__(self, wrapped, parent): def __init__(self, wrapped, parent, services=None):
super().__init__(wrapped) super().__init__(wrapped)
self.parent = parent self.parent = parent
if services:
if parent:
raise RuntimeError(
'Having services defined both in GraphExecutionContext and child NodeExecutionContext is not supported, for now.')
self.services = Container(services) if services else Container()
else:
self.services = None
self._started, self._stopped, self._context, self._stack = False, False, None, [] self._started, self._stopped, self._context, self._stack = False, False, None, []
def start(self): def start(self):
@ -34,7 +43,12 @@ class LoopingExecutionContext(Wrapper):
assert self._context is None assert self._context is None
self._started = True self._started = True
try: try:
self._context = self.parent.services.args_for(self.wrapped) if self.parent else () if self.parent:
self._context = self.parent.services.args_for(self.wrapped)
elif self.services:
self._context = self.services.args_for(self.wrapped)
else:
self._context = ()
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc()) self.handle_error(exc, traceback.format_exc())
raise raise

View File

@ -2,12 +2,12 @@ import traceback
from queue import Empty from queue import Empty
from time import sleep from time import sleep
from bonobo.structs.bags import Bag, ErrorBag
from bonobo.constants import INHERIT_INPUT, NOT_MODIFIED from bonobo.constants import INHERIT_INPUT, NOT_MODIFIED
from bonobo.core.inputs import Input from bonobo.core.inputs import Input
from bonobo.core.statistics import WithStatistics from bonobo.core.statistics import WithStatistics
from bonobo.errors import InactiveReadableError from bonobo.errors import InactiveReadableError
from bonobo.execution.base import LoopingExecutionContext from bonobo.execution.base import LoopingExecutionContext
from bonobo.structs.bags import Bag, ErrorBag
from bonobo.util.iterators import iter_if_not_sequence from bonobo.util.iterators import iter_if_not_sequence
@ -21,8 +21,8 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
"""todo check if this is right, and where it is used""" """todo check if this is right, and where it is used"""
return self.input.alive and self._started and not self._stopped return self.input.alive and self._started and not self._stopped
def __init__(self, wrapped, parent): def __init__(self, wrapped, parent=None, services=None):
LoopingExecutionContext.__init__(self, wrapped, parent) LoopingExecutionContext.__init__(self, wrapped, parent=parent, services=services)
WithStatistics.__init__(self, 'in', 'out', 'err') WithStatistics.__init__(self, 'in', 'out', 'err')
self.input = Input() self.input = Input()
@ -115,9 +115,11 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
else: else:
self.push(_resolve(input_bag, result)) self.push(_resolve(input_bag, result))
def is_error(bag): def is_error(bag):
return isinstance(bag, ErrorBag) return isinstance(bag, ErrorBag)
def _resolve(input_bag, output): def _resolve(input_bag, output):
# NotModified means to send the input unmodified to output. # NotModified means to send the input unmodified to output.
if output is NOT_MODIFIED: if output is NOT_MODIFIED:

View File

@ -3,7 +3,7 @@ import csv
from bonobo.config import Option from bonobo.config import Option
from bonobo.config.processors import ContextProcessor, contextual from bonobo.config.processors import ContextProcessor, contextual
from bonobo.util.objects import ValueHolder from bonobo.util.objects import ValueHolder
from .file import FileReader, FileWriter, FileHandler from .file import FileHandler, FileReader, FileWriter
class CsvHandler(FileHandler): class CsvHandler(FileHandler):
@ -41,10 +41,10 @@ class CsvReader(CsvHandler, FileReader):
skip = Option(int, default=0) skip = Option(int, default=0)
@ContextProcessor @ContextProcessor
def csv_headers(self, context, file): def csv_headers(self, context, fs, file):
yield ValueHolder(self.headers) yield ValueHolder(self.headers)
def read(self, file, headers): def read(self, fs, file, headers):
reader = csv.reader(file, delimiter=self.delimiter, quotechar=self.quotechar) reader = csv.reader(file, delimiter=self.delimiter, quotechar=self.quotechar)
headers.value = headers.value or next(reader) headers.value = headers.value or next(reader)
field_count = len(headers.value) field_count = len(headers.value)
@ -55,7 +55,7 @@ class CsvReader(CsvHandler, FileReader):
for row in reader: for row in reader:
if len(row) != field_count: if len(row) != field_count:
raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count, )) raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count,))
yield dict(zip(headers.value, row)) yield dict(zip(headers.value, row))
@ -63,12 +63,12 @@ class CsvReader(CsvHandler, FileReader):
@contextual @contextual
class CsvWriter(CsvHandler, FileWriter): class CsvWriter(CsvHandler, FileWriter):
@ContextProcessor @ContextProcessor
def writer(self, context, file, lineno): def writer(self, context, fs, file, lineno):
writer = csv.writer(file, delimiter=self.delimiter, quotechar=self.quotechar) writer = csv.writer(file, delimiter=self.delimiter, quotechar=self.quotechar, lineterminator=self.eol)
headers = ValueHolder(list(self.headers) if self.headers else None) headers = ValueHolder(list(self.headers) if self.headers else None)
yield writer, headers yield writer, headers
def write(self, file, lineno, writer, headers, row): def write(self, fs, file, lineno, writer, headers, row):
if not lineno.value: if not lineno.value:
headers.value = headers.value or row.keys() headers.value = headers.value or row.keys()
writer.writerow(headers.value) writer.writerow(headers.value)

View File

@ -1,8 +1,6 @@
from io import BytesIO from bonobo.config import Option, Service
from bonobo.config import Option
from bonobo.config.processors import ContextProcessor, contextual
from bonobo.config.configurables import Configurable from bonobo.config.configurables import Configurable
from bonobo.config.processors import ContextProcessor, contextual
from bonobo.util.objects import ValueHolder from bonobo.util.objects import ValueHolder
__all__ = [ __all__ = [
@ -13,30 +11,34 @@ __all__ = [
@contextual @contextual
class FileHandler(Configurable): class FileHandler(Configurable):
""" """Abstract component factory for file-related components.
Abstract component factory for file-related components.
Args:
path (str): which path to use within the provided filesystem.
eol (str): which character to use to separate lines.
mode (str): which mode to use when opening the file.
fs (str): service name to use for filesystem.
""" """
path = Option(str, required=True) path = Option(str, required=True) # type: str
eol = Option(str, default='\n') eol = Option(str, default='\n') # type: str
mode = Option(str) mode = Option(str) # type: str
fs = Service('fs') # type: str
@ContextProcessor @ContextProcessor
def file(self, context): def file(self, context, fs):
if self.path.find('http://') == 0 or self.path.find('https://') == 0: with self.open(fs) as file:
import requests
response = requests.get(self.path)
yield BytesIO(response.content)
else:
with self.open() as file:
yield file yield file
def open(self): def open(self, fs):
return open(self.path, self.mode) return fs.open(self.path, self.mode)
class Reader(FileHandler): class Reader(FileHandler):
"""Abstract component factory for readers.
"""
def __call__(self, *args): def __call__(self, *args):
yield from self.read(*args) yield from self.read(*args)
@ -45,6 +47,9 @@ class Reader(FileHandler):
class Writer(FileHandler): class Writer(FileHandler):
"""Abstract component factory for writers.
"""
def __call__(self, *args): def __call__(self, *args):
return self.write(*args) return self.write(*args)
@ -53,23 +58,18 @@ class Writer(FileHandler):
class FileReader(Reader): class FileReader(Reader):
""" """Component factory for file-like readers.
Component factory for file-like readers.
On its own, it can be used to read a file and yield one row per line, trimming the "eol" character at the end if On its own, it can be used to read a file and yield one row per line, trimming the "eol" character at the end if
present. Extending it is usually the right way to create more specific file readers (like json, csv, etc.) present. Extending it is usually the right way to create more specific file readers (like json, csv, etc.)
""" """
mode = Option(str, default='r') mode = Option(str, default='r')
def read(self, file): def read(self, fs, file):
""" """
Write a row on the next line of given file. Write a row on the next line of given file.
Prefix is used for newlines. Prefix is used for newlines.
:param ctx:
:param row:
""" """
for line in file: for line in file:
yield line.rstrip(self.eol) yield line.rstrip(self.eol)
@ -77,28 +77,22 @@ class FileReader(Reader):
@contextual @contextual
class FileWriter(Writer): class FileWriter(Writer):
""" """Component factory for file or file-like writers.
Component factory for file or file-like writers.
On its own, it can be used to write in a file one line per row that comes into this component. Extending it is On its own, it can be used to write in a file one line per row that comes into this component. Extending it is
usually the right way to create more specific file writers (like json, csv, etc.) usually the right way to create more specific file writers (like json, csv, etc.)
""" """
mode = Option(str, default='w+') mode = Option(str, default='w+')
@ContextProcessor @ContextProcessor
def lineno(self, context, file): def lineno(self, context, fs, file):
lineno = ValueHolder(0, type=int) lineno = ValueHolder(0, type=int)
yield lineno yield lineno
def write(self, file, lineno, row): def write(self, fs, file, lineno, row):
""" """
Write a row on the next line of opened file in context. Write a row on the next line of opened file in context.
:param file fp:
:param str row:
:param str prefix:
""" """
self._write_line(file, (self.eol if lineno.value else '') + row) self._write_line(file, (self.eol if lineno.value else '') + row)
lineno.value += 1 lineno.value += 1

View File

@ -15,7 +15,7 @@ class JsonHandler:
class JsonReader(JsonHandler, FileReader): class JsonReader(JsonHandler, FileReader):
loader = staticmethod(json.load) loader = staticmethod(json.load)
def read(self, file): def read(self, fs, file):
for line in self.loader(file): for line in self.loader(file):
yield line yield line
@ -23,16 +23,16 @@ class JsonReader(JsonHandler, FileReader):
@contextual @contextual
class JsonWriter(JsonHandler, FileWriter): class JsonWriter(JsonHandler, FileWriter):
@ContextProcessor @ContextProcessor
def envelope(self, context, file, lineno): def envelope(self, context, fs, file, lineno):
file.write('[\n') file.write('[\n')
yield yield
file.write('\n]') file.write('\n]')
def write(self, file, lineno, row): def write(self, fs, file, lineno, row):
""" """
Write a json row on the next line of file pointed by ctx.file. Write a json row on the next line of file pointed by ctx.file.
:param ctx: :param ctx:
:param row: :param row:
""" """
return super().write(file, lineno, json.dumps(row)) return super().write(fs, file, lineno, json.dumps(row))

View File

@ -1,8 +1,42 @@
from bonobo.strategies.executor import ThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy from bonobo.strategies.executor import ProcessPoolExecutorStrategy, ThreadPoolExecutorStrategy
from bonobo.strategies.naive import NaiveStrategy from bonobo.strategies.naive import NaiveStrategy
__all__ = [ __all__ = [
'NaiveStrategy', 'create_strategy',
'ProcessPoolExecutorStrategy',
'ThreadPoolExecutorStrategy',
] ]
STRATEGIES = {
'naive': NaiveStrategy,
'processpool': ProcessPoolExecutorStrategy,
'threadpool': ThreadPoolExecutorStrategy,
}
DEFAULT_STRATEGY = 'threadpool'
def create_strategy(name=None):
"""
Create a strategy, or just returns it if it's already one.
:param name:
:return: Strategy
"""
from bonobo.strategies.base import Strategy
import logging
if isinstance(name, Strategy):
return name
if name is None:
name = DEFAULT_STRATEGY
logging.debug('Creating strategy {}...'.format(name))
try:
factory = STRATEGIES[name]
except KeyError as exc:
raise RuntimeError(
'Invalid strategy {}. Available choices: {}.'.format(repr(name), ', '.join(sorted(STRATEGIES.keys())))
) from exc
return factory()

View File

@ -0,0 +1,7 @@
from bonobo.structs.bags import Bag
from bonobo.structs.graphs import Graph
from bonobo.structs.tokens import Token
__all__ = [
'Bag', 'Graph', 'Token'
]

View File

@ -4,6 +4,6 @@ from bonobo.execution.node import NodeExecutionContext
class CapturingNodeExecutionContext(NodeExecutionContext): class CapturingNodeExecutionContext(NodeExecutionContext):
def __init__(self, wrapped, parent): def __init__(self, *args, **kwargs):
super().__init__(wrapped, parent) super().__init__(*args, **kwargs)
self.send = MagicMock() self.send = MagicMock()

View File

@ -0,0 +1,9 @@
Development Status :: 3 - Alpha
Intended Audience :: Developers
Intended Audience :: Information Technology
License :: OSI Approved :: Apache Software License
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3.5
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3 :: Only

View File

@ -1,23 +1,57 @@
Contributing Contributing
============ ============
Contributing to bonobo is simple. Although we don't have a complete guide on this topic for now, the best way is to fork Contributing to bonobo is usually done this way:
* Discuss ideas in the `issue tracker <https://github.com/python-bonobo/bonobo>`_ or on `Slack <https://bonobo-slack.herokuapp.com/>`_.
* Fork the `repository <https://github.com/python-bonobo>`_.
* Think about what happens for existing userland code if your patch is applied.
* Open pull request early with your code to continue the discussion as you're writing code.
* Try to write simple tests, and a few lines of documentation.
Although we don't have a complete guide on this topic for now, the best way is to fork
the github repository and send pull requests. the github repository and send pull requests.
A few guidelines... Tools
:::::
* Starting at 1.0, the system needs to be 100% backward compatible. Best way to do so is to ensure the actual expected
behavior is unit tested before making any change. See http://semver.org/.
* There can be changes before 1.0, even backward incompatible changes. There should be a reason for a BC break, but
I think it's best for the speed of development right now.
* The core should stay as light as possible.
* Coding standards are enforced using yapf. That means that you can code the way you want, we just ask you to run
`make format` before committing your changes so everybody follows the same conventions.
* General rule for anything you're not sure about is "open a github issue to discuss the point".
* More formal proposal process will come the day we feel the need for it.
Issues: https://github.com/python-bonobo/bonobo/issues Issues: https://github.com/python-bonobo/bonobo/issues
Roadmap: https://www.bonobo-project.org/roadmap Roadmap: https://www.bonobo-project.org/roadmap
Slack: https://bonobo-slack.herokuapp.com/ Slack: https://bonobo-slack.herokuapp.com/
Guidelines
::::::::::
* We tend to use `semantic versioning <http://semver.org/>`_. This should be 100% true once we reach 1.0, but until then we will fail
and learn. Anyway, the user effort for each BC-break is a real pain, and we want to keep that in mind.
* The 1.0 milestone has one goal: create a solid foundation we can rely on, in term of API. To reach that, we want to keep it as
minimalist as possible, considering only a few userland tools as the public API.
* Said simplier, the core should stay as light as possible.
* Let's not fight over coding standards. We enforce it using `yapf <https://github.com/google/yapf#yapf>`_, and a `make format` call
should reformat the whole codebase for you. We encourage you to run it before making a pull request, and it will be run before each
release anyway, so we can focus on things that have value instead of details.
* Tests are important. One obvious reason is that we want to have a stable and working system, but one less obvious reason is that
it forces better design, making sure responsibilities are well separated and scope of each function is clear. More often than not,
the "one and only obvious way to do it" will be obvious once you write the tests.
* Documentation is important. It's the only way people can actually understand what the system do, and userless software is pointless.
One book I read a long time ago said that half the energy spent building something should be devoted to explaining what and why you're
doing something, and that's probably one of the best advice I read about (although, as every good piece of advice, it's more easy to
repeat than to apply).
License
:::::::
`Bonobo is released under the apache license <https://github.com/python-bonobo/bonobo/blob/0.2/LICENSE>`_.
License for non lawyers
:::::::::::::::::::::::
Use it, change it, hack it, brew it, eat it.
For pleasure, non-profit, profit or basically anything else, except stealing credit.
Provided without warranty.

View File

@ -1,48 +1,40 @@
Pure transformations Pure transformations
==================== ====================
The nature of components, and how the data flow from one to another, make them not so easy to write correctly. The nature of components, and how the data flow from one to another, can be a bit tricky.
Hopefully, with a few hints, you will be able to understand why and how they should be written. Hopefully, they should be very easy to write with a few hints.
The major problem we have is that one message can go through more than one component, and at the same time. If you The major problem we have is that one message (underlying implementation: :class:`bonobo.structs.bags.Bag`) can go
wanna be safe, you tend to :func:`copy.copy()` everything between two calls to two different components, but that through more than one component, and at the same time. If you wanna be safe, you tend to :func:`copy.copy()` everything
will mean that a lot of useless memory space would be taken for copies that are never modified. between two calls to two different components, but that's very expensive.
Instead of that, we chosed the oposite: copies are never made, and you should not modify in place the inputs of your Instead of that, we chosed the oposite: copies are never made, and you should not modify in place the inputs of your
component before yielding them, and that mostly means that you want to recreate dicts and lists before yielding (or component before yielding them, and that mostly means that you want to recreate dicts and lists before yielding (or
returning) them. Numeric values, strings and tuples being immutable in python, modifying a variable of one of those returning) them. Numeric values, strings and tuples being immutable in python, modifying a variable of one of those
type will already return a different instance. type will already return a different instance.
Examples will be shown with `return` statements, of course you can do the same with `yield` statements in generators.
Numbers Numbers
::::::: :::::::
You can't be wrong with numbers. All of the following are correct. In python, numbers are immutable. So you can't be wrong with numbers. All of the following are correct.
.. code-block:: python .. code-block:: python
def do_your_number_thing(n: int) -> int: def do_your_number_thing(n: int) -> int:
return n return n
def do_your_number_thing(n: int) -> int:
yield n
def do_your_number_thing(n: int) -> int: def do_your_number_thing(n: int) -> int:
return n + 1 return n + 1
def do_your_number_thing(n: int) -> int:
yield n + 1
def do_your_number_thing(n: int) -> int: def do_your_number_thing(n: int) -> int:
# correct, but bad style # correct, but bad style
n += 1 n += 1
return n return n
def do_your_number_thing(n: int) -> int: The same is true with other numeric types, so don't be shy.
# correct, but bad style
n += 1
yield n
The same is true with other numeric types, so don't be shy. Operate like crazy, my friend.
Tuples Tuples
:::::: ::::::
@ -65,12 +57,27 @@ Tuples are immutable, so you risk nothing.
Strings Strings
::::::: :::::::
You know the drill, strings are immutable, blablabla ... Examples left as an exercise for the reader. You know the drill, strings are immutable.
.. code-block:: python
def do_your_str_thing(t: str) -> str:
return 'foo ' + t + ' bar'
def do_your_str_thing(t: str) -> str:
return ' '.join(('foo', t, 'bar', ))
def do_your_str_thing(t: str) -> str:
return 'foo {} bar'.format(t)
You can, if you're using python 3.6+, use `f-strings <https://docs.python.org/3/reference/lexical_analysis.html#f-strings>`_,
but the core bonobo libraries won't use it to stay 3.5 compatible.
Dicts Dicts
::::: :::::
So, now it gets interesting. Dicts are mutable. It means that you can mess things up badly here if you're not cautious. So, now it gets interesting. Dicts are mutable. It means that you can mess things up if you're not cautious.
For example, doing the following may cause unexpected problems: For example, doing the following may cause unexpected problems:
@ -86,8 +93,8 @@ For example, doing the following may cause unexpected problems:
return d return d
The problem is easy to understand: as **Bonobo** won't make copies of your dict, the same dict will be passed along the The problem is easy to understand: as **Bonobo** won't make copies of your dict, the same dict will be passed along the
transformation graph, and mutations will be seen in components downwards the output, but also upward. Let's see transformation graph, and mutations will be seen in components downwards the output (and also upward). Let's see
a more obvious example of something you should not do: a more obvious example of something you should *not* do:
.. code-block:: python .. code-block:: python
@ -98,7 +105,8 @@ a more obvious example of something you should not do:
d['index'] = i d['index'] = i
yield d yield d
Here, the same dict is yielded in each iteration, and its state when the next component in chain is called is undetermined. Here, the same dict is yielded in each iteration, and its state when the next component in chain is called is undetermined
(how many mutations happened since the `yield`? Hard to tell...).
Now let's see how to do it correctly: Now let's see how to do it correctly:
@ -120,9 +128,17 @@ Now let's see how to do it correctly:
'index': i 'index': i
} }
I hear you think «Yeah, but if I create like millions of dicts ...». The answer is simple. Using dicts like this will I hear you think «Yeah, but if I create like millions of dicts ...».
create a lot, but also free a lot because as soon as all the future components that take this dict as input are done,
the dict will be garbage collected. Youplaboum!
Let's say we chosed the oposite way and copy the dict outside the transformation (in fact, `it's what we did in bonobo's
ancestor <https://github.com/rdcli/rdc.etl/blob/dev/rdc/etl/io/__init__.py#L187>`_). This means you will also create the
same number of dicts, the difference is that you won't even notice it. Also, it means that if you want to yield 1 million
times the same dict, going "pure" makes it efficient (you'll just yield the same object 1 million times) while going "copy
crazy" will create 1 million objects.
Using dicts like this will create a lot of dicts, but also free them as soon as all the future components that take this dict
as input are done. Also, one important thing to note is that most primitive data structures in python are immutable, so creating
a new dict will of course create a new envelope, but the unchanged objects inside won't be duplicated.
Last thing, copies made in the "pure" approach are explicit, and usually, explicit is better than implicit.

View File

@ -3,7 +3,7 @@ Services and dependencies (draft implementation)
:Status: Draft implementation :Status: Draft implementation
:Stability: Alpha :Stability: Alpha
:Last-Modified: 27 apr 2017 :Last-Modified: 28 apr 2017
Most probably, you'll want to use external systems within your transformations. Those systems may include databases, Most probably, you'll want to use external systems within your transformations. Those systems may include databases,
apis (using http, for example), filesystems, etc. apis (using http, for example), filesystems, etc.
@ -82,6 +82,13 @@ A dictionary, or dictionary-like, "services" named argument can be passed to the
provided is pretty basic, and feature-less. But you can use much more evolved libraries instead of the provided provided is pretty basic, and feature-less. But you can use much more evolved libraries instead of the provided
stub, and as long as it works the same (a.k.a implements a dictionary-like interface), the system will use it. stub, and as long as it works the same (a.k.a implements a dictionary-like interface), the system will use it.
Service configuration (to be decided and implemented)
:::::::::::::::::::::::::::::::::::::::::::::::::::::
* There should be a way to configure default service implementation for a python file, a directory, a project ...
* There should be a way to override services when running a transformation.
* There should be a way to use environment for service configuration.
Future and proposals Future and proposals
:::::::::::::::::::: ::::::::::::::::::::

View File

@ -8,3 +8,4 @@ References
commands commands
api api
examples

View File

@ -41,8 +41,8 @@ setup(
description='Bonobo', description='Bonobo',
license='Apache License, Version 2.0', license='Apache License, Version 2.0',
install_requires=[ install_requires=[
'colorama >=0.3,<0.4', 'psutil >=5.2,<5.3', 'requests >=2.13,<2.14', 'colorama ==0.3.9', 'fs ==2.0.3', 'psutil ==5.2.2',
'stevedore >=1.19,<1.20' 'requests ==2.13.0', 'stevedore ==1.21.0'
], ],
version=version, version=version,
long_description=read('README.rst'), long_description=read('README.rst'),
@ -56,9 +56,9 @@ setup(
])], ])],
extras_require={ extras_require={
'dev': [ 'dev': [
'coverage >=4.3,<4.4', 'mock >=2.0,<2.1', 'pylint >=1,<2', 'coverage >=4,<5', 'pylint >=1,<2', 'pytest >=3,<4',
'pytest >=3,<4', 'pytest-cov >=2,<3', 'pytest-timeout >=1,<2', 'pytest-cov >=2,<3', 'pytest-timeout >=1,<2', 'sphinx',
'sphinx', 'sphinx_rtd_theme', 'yapf' 'sphinx_rtd_theme', 'yapf'
], ],
'jupyter': ['jupyter >=1.0,<1.1', 'ipywidgets >=6.0.0.beta5'] 'jupyter': ['jupyter >=1.0,<1.1', 'ipywidgets >=6.0.0.beta5']
}, },

View File

@ -1,15 +1,16 @@
import pytest import pytest
from bonobo import Bag, CsvReader, CsvWriter from bonobo import Bag, CsvReader, CsvWriter, open_fs
from bonobo.constants import BEGIN, END from bonobo.constants import BEGIN, END
from bonobo.execution.node import NodeExecutionContext from bonobo.execution.node import NodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext
def test_write_csv_to_file(tmpdir): def test_write_csv_to_file(tmpdir):
file = tmpdir.join('output.json') fs, filename = open_fs(tmpdir), 'output.csv'
writer = CsvWriter(path=str(file))
context = NodeExecutionContext(writer, None) writer = CsvWriter(path=filename)
context = NodeExecutionContext(writer, services={'fs': fs})
context.recv(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END) context.recv(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END)
@ -18,19 +19,19 @@ def test_write_csv_to_file(tmpdir):
context.step() context.step()
context.stop() context.stop()
assert file.read() == 'foo\nbar\nbaz\n' assert fs.open(filename).read() == 'foo\nbar\nbaz\n'
with pytest.raises(AttributeError): with pytest.raises(AttributeError):
getattr(context, 'file') getattr(context, 'file')
def test_read_csv_from_file(tmpdir): def test_read_csv_from_file(tmpdir):
file = tmpdir.join('input.csv') fs, filename = open_fs(tmpdir), 'input.csv'
file.write('a,b,c\na foo,b foo,c foo\na bar,b bar,c bar') fs.open(filename, 'w').write('a,b,c\na foo,b foo,c foo\na bar,b bar,c bar')
reader = CsvReader(path=str(file), delimiter=',') reader = CsvReader(path=filename, delimiter=',')
context = CapturingNodeExecutionContext(reader, None) context = CapturingNodeExecutionContext(reader, services={'fs': fs})
context.start() context.start()
context.recv(BEGIN, Bag(), END) context.recv(BEGIN, Bag(), END)

View File

@ -1,6 +1,6 @@
import pytest import pytest
from bonobo import Bag, FileReader, FileWriter from bonobo import Bag, FileReader, FileWriter, open_fs
from bonobo.constants import BEGIN, END from bonobo.constants import BEGIN, END
from bonobo.execution.node import NodeExecutionContext from bonobo.execution.node import NodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext
@ -14,10 +14,10 @@ from bonobo.util.testing import CapturingNodeExecutionContext
] ]
) )
def test_file_writer_in_context(tmpdir, lines, output): def test_file_writer_in_context(tmpdir, lines, output):
file = tmpdir.join('output.txt') fs, filename = open_fs(tmpdir), 'output.txt'
writer = FileWriter(path=str(file)) writer = FileWriter(path=filename)
context = NodeExecutionContext(writer, None) context = NodeExecutionContext(writer, services={'fs': fs})
context.start() context.start()
context.recv(BEGIN, *map(Bag, lines), END) context.recv(BEGIN, *map(Bag, lines), END)
@ -25,25 +25,27 @@ def test_file_writer_in_context(tmpdir, lines, output):
context.step() context.step()
context.stop() context.stop()
assert file.read() == output assert fs.open(filename).read() == output
def test_file_writer_out_of_context(tmpdir): def test_file_writer_out_of_context(tmpdir):
file = tmpdir.join('output.txt') fs, filename = open_fs(tmpdir), 'output.txt'
writer = FileWriter(path=str(file))
with writer.open() as fp: writer = FileWriter(path=filename)
with writer.open(fs) as fp:
fp.write('Yosh!') fp.write('Yosh!')
assert file.read() == 'Yosh!' assert fs.open(filename).read() == 'Yosh!'
def test_file_reader_in_context(tmpdir): def test_file_reader_in_context(tmpdir):
file = tmpdir.join('input.txt') fs, filename = open_fs(tmpdir), 'input.txt'
file.write('Hello\nWorld\n')
reader = FileReader(path=str(file)) fs.open(filename, 'w').write('Hello\nWorld\n')
context = CapturingNodeExecutionContext(reader, None)
reader = FileReader(path=filename)
context = CapturingNodeExecutionContext(reader, services={'fs': fs})
context.start() context.start()
context.recv(BEGIN, Bag(), END) context.recv(BEGIN, Bag(), END)

View File

@ -1,22 +1,23 @@
import pytest import pytest
from bonobo import Bag, JsonReader, JsonWriter from bonobo import Bag, JsonReader, JsonWriter, open_fs
from bonobo.constants import BEGIN, END from bonobo.constants import BEGIN, END
from bonobo.execution.node import NodeExecutionContext from bonobo.execution.node import NodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext
def test_write_json_to_file(tmpdir): def test_write_json_to_file(tmpdir):
file = tmpdir.join('output.json') fs, filename = open_fs(tmpdir), 'output.json'
writer = JsonWriter(path=str(file))
context = NodeExecutionContext(writer, None) writer = JsonWriter(path=filename)
context = NodeExecutionContext(writer, services={'fs': fs})
context.start() context.start()
context.recv(BEGIN, Bag({'foo': 'bar'}), END) context.recv(BEGIN, Bag({'foo': 'bar'}), END)
context.step() context.step()
context.stop() context.stop()
assert file.read() == '[\n{"foo": "bar"}\n]' assert fs.open(filename).read() == '[\n{"foo": "bar"}\n]'
with pytest.raises(AttributeError): with pytest.raises(AttributeError):
getattr(context, 'file') getattr(context, 'file')
@ -26,11 +27,11 @@ def test_write_json_to_file(tmpdir):
def test_read_json_from_file(tmpdir): def test_read_json_from_file(tmpdir):
file = tmpdir.join('input.json') fs, filename = open_fs(tmpdir), 'input.json'
file.write('[{"x": "foo"},{"x": "bar"}]') fs.open(filename, 'w').write('[{"x": "foo"},{"x": "bar"}]')
reader = JsonReader(path=str(file)) reader = JsonReader(path=filename)
context = CapturingNodeExecutionContext(reader, None) context = CapturingNodeExecutionContext(reader, services={'fs': fs})
context.start() context.start()
context.recv(BEGIN, Bag(), END) context.recv(BEGIN, Bag(), END)

View File

@ -1,6 +1,7 @@
import pytest import pytest
from bonobo import Graph, BEGIN from bonobo.constants import BEGIN
from bonobo.structs import Graph
identity = lambda x: x identity = lambda x: x

View File

@ -1,4 +1,4 @@
from bonobo import Token from bonobo.structs import Token
def test_token_repr(): def test_token_repr():

View File

@ -1,7 +1,8 @@
from bonobo import Graph, NaiveStrategy, Bag
from bonobo.config.processors import contextual from bonobo.config.processors import contextual
from bonobo.constants import BEGIN, END from bonobo.constants import BEGIN, END
from bonobo.execution.graph import GraphExecutionContext from bonobo.execution.graph import GraphExecutionContext
from bonobo.strategies import NaiveStrategy
from bonobo.structs import Bag, Graph
def generate_integers(): def generate_integers():
@ -9,7 +10,7 @@ def generate_integers():
def square(i: int) -> int: def square(i: int) -> int:
return i**2 return i ** 2
@contextual @contextual

17
tests/test_publicapi.py Normal file
View File

@ -0,0 +1,17 @@
import types
def test_wildcard_import():
bonobo = __import__('bonobo')
assert bonobo.__version__
for name in dir(bonobo):
# ignore attributes starting by underscores
if name.startswith('_'):
continue
attr = getattr(bonobo, name)
if isinstance(attr, types.ModuleType):
continue
assert name in bonobo.__all__