diff --git a/Makefile b/Makefile index 1aeed01..e6bded1 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ # This file has been auto-generated. # All changes will be lost, see Projectfile. # -# Updated at 2017-04-27 10:59:55.259076 +# Updated at 2017-04-28 06:33:29.712011 PYTHON ?= $(shell which python) PYTHON_BASENAME ?= $(shell basename $(PYTHON)) diff --git a/Projectfile b/Projectfile index 5826814..f8d446f 100644 --- a/Projectfile +++ b/Projectfile @@ -22,6 +22,7 @@ enable_features = { install_requires = [ 'colorama ==0.3.9', + 'fs ==2.0.3', 'psutil ==5.2.2', 'requests ==2.13.0', 'stevedore ==1.21.0', diff --git a/bonobo/__init__.py b/bonobo/__init__.py index 50eb44d..6123b9a 100644 --- a/bonobo/__init__.py +++ b/bonobo/__init__.py @@ -7,113 +7,10 @@ """Bonobo data-processing toolkit main module.""" import sys -import warnings assert (sys.version_info >= (3, 5)), 'Python 3.5+ is required to use Bonobo.' +from bonobo._api import * +from bonobo._api import __all__ -from ._version import __version__ -from .basics import __all__ as __all_basics__ -from .config import __all__ as __all_config__ -from .execution import __all__ as __all_execution__ -from .io import __all__ as __all_io__ -from .strategies import __all__ as __all_strategies__ - -__all__ = __all_basics__ + __all_config__ + __all_execution__ + __all_io__ + __all_strategies__ + [ - 'Bag', - 'ErrorBag' - 'Graph', - 'Token', - '__version__', - 'create_strategy', - 'get_examples_path', - 'run', -] - -from .basics import * -from .config import * -from .execution import * -from .io import * -from .strategies import * -from .structs.bags import * -from .structs.graphs import * -from .structs.tokens import * - -DEFAULT_STRATEGY = 'threadpool' - -STRATEGIES = { - 'naive': NaiveStrategy, - 'processpool': ProcessPoolExecutorStrategy, - 'threadpool': ThreadPoolExecutorStrategy, -} - - -def get_examples_path(*pathsegments): - import os - import pathlib - return str(pathlib.Path(os.path.dirname(__file__), 'examples', *pathsegments)) - - -def create_strategy(name=None): - """ - Create a strategy, or just returns it if it's already one. - - :param name: - :return: Strategy - """ - from bonobo.strategies.base import Strategy - import logging - - if isinstance(name, Strategy): - return name - - if name is None: - name = DEFAULT_STRATEGY - - logging.debug('Creating strategy {}...'.format(name)) - - try: - factory = STRATEGIES[name] - except KeyError as exc: - raise RuntimeError( - 'Invalid strategy {}. Available choices: {}.'.format(repr(name), ', '.join(sorted(STRATEGIES.keys()))) - ) from exc - - return factory() - - -def _is_interactive_console(): - import sys - return sys.stdout.isatty() - - -def _is_jupyter_notebook(): - try: - return get_ipython().__class__.__name__ == 'ZMQInteractiveShell' - except NameError: - return False - - -def run(graph, *chain, strategy=None, plugins=None, services=None): - if len(chain): - warnings.warn('DEPRECATED. You should pass a Graph instance instead of a chain.') - from bonobo import Graph - graph = Graph(graph, *chain) - - strategy = create_strategy(strategy) - plugins = [] - - if _is_interactive_console(): - from bonobo.ext.console import ConsoleOutputPlugin - if ConsoleOutputPlugin not in plugins: - plugins.append(ConsoleOutputPlugin) - - if _is_jupyter_notebook(): - from bonobo.ext.jupyter import JupyterOutputPlugin - if JupyterOutputPlugin not in plugins: - plugins.append(JupyterOutputPlugin) - - return strategy.execute(graph, plugins=plugins, services=services) - - +__all__ = __all__ del sys -del warnings diff --git a/bonobo/_api.py b/bonobo/_api.py new file mode 100644 index 0000000..a566c58 --- /dev/null +++ b/bonobo/_api.py @@ -0,0 +1,80 @@ +from bonobo._version import __version__ + +__all__ = [ + '__version__', +] + +from bonobo.structs import Bag, Graph + +__all__ += ['Bag', 'Graph'] + +# Filesystem. This is a shortcut from the excellent filesystem2 library, that we make available there for convenience. +from fs import open_fs as _open_fs +open_fs = lambda url, *args, **kwargs: _open_fs(str(url), *args, **kwargs) +__all__ += ['open_fs'] + +# Basic transformations. +from bonobo.basics import * +from bonobo.basics import __all__ as _all_basics + +__all__ += _all_basics + +# Execution strategies. +from bonobo.strategies import create_strategy + +__all__ += ['create_strategy'] + + +# Extract and loads from stdlib. +from bonobo.io import * +from bonobo.io import __all__ as _all_io + +__all__ += _all_io + + +# XXX This may be belonging to the bonobo.examples package. +def get_examples_path(*pathsegments): + import os + import pathlib + return str(pathlib.Path(os.path.dirname(__file__), 'examples', *pathsegments)) + + +__all__.append(get_examples_path.__name__) + + +def _is_interactive_console(): + import sys + return sys.stdout.isatty() + + +def _is_jupyter_notebook(): + try: + return get_ipython().__class__.__name__ == 'ZMQInteractiveShell' + except NameError: + return False + + +# @api +def run(graph, *chain, strategy=None, plugins=None, services=None): + if len(chain): + warnings.warn('DEPRECATED. You should pass a Graph instance instead of a chain.') + from bonobo import Graph + graph = Graph(graph, *chain) + + strategy = create_strategy(strategy) + plugins = [] + + if _is_interactive_console(): + from bonobo.ext.console import ConsoleOutputPlugin + if ConsoleOutputPlugin not in plugins: + plugins.append(ConsoleOutputPlugin) + + if _is_jupyter_notebook(): + from bonobo.ext.jupyter import JupyterOutputPlugin + if JupyterOutputPlugin not in plugins: + plugins.append(JupyterOutputPlugin) + + return strategy.execute(graph, plugins=plugins, services=services) + + +__all__.append(run.__name__) diff --git a/bonobo/basics.py b/bonobo/basics.py index 1eb8065..9709976 100644 --- a/bonobo/basics.py +++ b/bonobo/basics.py @@ -19,6 +19,7 @@ __all__ = [ 'noop', ] + def identity(x): return x diff --git a/bonobo/config/__init__.py b/bonobo/config/__init__.py index 8f5ecc2..c4ba410 100644 --- a/bonobo/config/__init__.py +++ b/bonobo/config/__init__.py @@ -1,11 +1,12 @@ from bonobo.config.configurables import Configurable from bonobo.config.options import Option -from bonobo.config.services import Container, Service from bonobo.config.processors import ContextProcessor +from bonobo.config.services import Container, Service __all__ = [ 'Configurable', 'Container', + 'ContextProcessor', 'Option', 'Service', ] diff --git a/bonobo/examples/datasets/coffeeshops.py b/bonobo/examples/datasets/coffeeshops.py index 7e801d9..294164f 100644 --- a/bonobo/examples/datasets/coffeeshops.py +++ b/bonobo/examples/datasets/coffeeshops.py @@ -1,16 +1,21 @@ -from os.path import dirname, realpath, join - import bonobo from bonobo.ext.opendatasoft import OpenDataSoftAPI -OUTPUT_FILENAME = realpath(join(dirname(__file__), 'coffeeshops.txt')) +filename = 'coffeeshops.txt' graph = bonobo.Graph( OpenDataSoftAPI(dataset='liste-des-cafes-a-un-euro', netloc='opendata.paris.fr'), lambda row: '{nom_du_cafe}, {adresse}, {arrondissement} Paris, France'.format(**row), - bonobo.FileWriter(path=OUTPUT_FILENAME), + bonobo.FileWriter(path=filename), ) + +def get_services(): + from os.path import dirname + return { + 'fs': bonobo.open_fs(dirname(__file__)) + } + + if __name__ == '__main__': - bonobo.run(graph) - print('Import done, read {} for results.'.format(OUTPUT_FILENAME)) + bonobo.run(graph, services=get_services()) diff --git a/bonobo/examples/datasets/fablabs.py b/bonobo/examples/datasets/fablabs.py index 80f6e29..7f1f1a5 100644 --- a/bonobo/examples/datasets/fablabs.py +++ b/bonobo/examples/datasets/fablabs.py @@ -1,11 +1,10 @@ import json -import os - -from bonobo import JsonWriter, Graph, get_examples_path -from bonobo.basics import Tee -from bonobo.ext.opendatasoft import OpenDataSoftAPI from colorama import Fore, Style + +import bonobo +from bonobo.ext.opendatasoft import OpenDataSoftAPI + try: import pycountry except ImportError as exc: @@ -15,8 +14,6 @@ API_DATASET = 'fablabs-in-the-world' API_NETLOC = 'datanova.laposte.fr' ROWS = 100 -__path__ = os.path.dirname(__file__) - def _getlink(x): return x.get('url', None) @@ -55,15 +52,21 @@ def display(row): print(' - {}source{}: {source}'.format(Fore.BLUE, Style.RESET_ALL, source='datanova/' + API_DATASET)) -graph = Graph( +graph = bonobo.Graph( OpenDataSoftAPI(dataset=API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris'), normalize, filter_france, - Tee(display), - JsonWriter(path=get_examples_path('datasets/fablabs.txt')), + bonobo.Tee(display), + bonobo.JsonWriter(path='datasets/fablabs.txt'), ) -if __name__ == '__main__': - from bonobo import run - run(graph) +def get_services(): + from os.path import dirname + return { + 'fs': bonobo.open_fs(dirname(__file__)) + } + + +if __name__ == '__main__': + bonobo.run(graph, services=get_services()) diff --git a/bonobo/examples/files/csv.py b/bonobo/examples/files/csv.py index f3315f7..a4358d3 100644 --- a/bonobo/examples/files/csv.py +++ b/bonobo/examples/files/csv.py @@ -1,11 +1,11 @@ -from bonobo import CsvReader, Graph, get_examples_path +import bonobo -graph = Graph( - CsvReader(path=get_examples_path('datasets/coffeeshops.txt')), +from ._services import get_services + +graph = bonobo.Graph( + bonobo.CsvReader(path='datasets/coffeeshops.txt'), print, ) if __name__ == '__main__': - import bonobo - - bonobo.run(graph) + bonobo.run(graph, services=get_services()) diff --git a/bonobo/examples/files/json.py b/bonobo/examples/files/json.py index 6f9f418..3f6fdd9 100644 --- a/bonobo/examples/files/json.py +++ b/bonobo/examples/files/json.py @@ -1,8 +1,13 @@ -import bonobo as bb +import bonobo + +from ._services import get_services url = 'https://data.toulouse-metropole.fr/explore/dataset/theatres-et-salles-de-spectacles/download?format=json&timezone=Europe/Berlin&use_labels_for_header=true' -graph = bb.Graph(bb.JsonReader(path=url), print) +graph = bonobo.Graph( + bonobo.JsonReader(path=url), + print +) if __name__ == '__main__': - bb.run(graph) + bonobo.run(graph) diff --git a/bonobo/execution/__init__.py b/bonobo/execution/__init__.py index aaf4ba3..ad2defc 100644 --- a/bonobo/execution/__init__.py +++ b/bonobo/execution/__init__.py @@ -1,9 +1,3 @@ from bonobo.execution.graph import GraphExecutionContext, NodeExecutionContext, PluginExecutionContext -__all__ = [ - 'GraphExecutionContext', - 'NodeExecutionContext', - 'PluginExecutionContext', -] - diff --git a/bonobo/execution/base.py b/bonobo/execution/base.py index b84cb70..c8357d6 100644 --- a/bonobo/execution/base.py +++ b/bonobo/execution/base.py @@ -2,6 +2,7 @@ import sys import traceback from time import sleep +from bonobo.config import Container from bonobo.config.processors import resolve_processors from bonobo.util.iterators import ensure_tuple from bonobo.util.objects import Wrapper @@ -23,9 +24,17 @@ class LoopingExecutionContext(Wrapper): def stopped(self): return self._stopped - def __init__(self, wrapped, parent): + def __init__(self, wrapped, parent, services=None): super().__init__(wrapped) self.parent = parent + if services: + if parent: + raise RuntimeError( + 'Having services defined both in GraphExecutionContext and child NodeExecutionContext is not supported, for now.') + self.services = Container(services) if services else Container() + else: + self.services = None + self._started, self._stopped, self._context, self._stack = False, False, None, [] def start(self): @@ -34,7 +43,12 @@ class LoopingExecutionContext(Wrapper): assert self._context is None self._started = True try: - self._context = self.parent.services.args_for(self.wrapped) if self.parent else () + if self.parent: + self._context = self.parent.services.args_for(self.wrapped) + elif self.services: + self._context = self.services.args_for(self.wrapped) + else: + self._context = () except Exception as exc: # pylint: disable=broad-except self.handle_error(exc, traceback.format_exc()) raise @@ -102,4 +116,4 @@ class LoopingExecutionContext(Wrapper): sep='', file=sys.stderr, ) - print(trace) \ No newline at end of file + print(trace) diff --git a/bonobo/execution/node.py b/bonobo/execution/node.py index 14bea86..8822e73 100644 --- a/bonobo/execution/node.py +++ b/bonobo/execution/node.py @@ -2,12 +2,12 @@ import traceback from queue import Empty from time import sleep -from bonobo.structs.bags import Bag, ErrorBag from bonobo.constants import INHERIT_INPUT, NOT_MODIFIED from bonobo.core.inputs import Input from bonobo.core.statistics import WithStatistics from bonobo.errors import InactiveReadableError from bonobo.execution.base import LoopingExecutionContext +from bonobo.structs.bags import Bag, ErrorBag from bonobo.util.iterators import iter_if_not_sequence @@ -21,8 +21,8 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): """todo check if this is right, and where it is used""" return self.input.alive and self._started and not self._stopped - def __init__(self, wrapped, parent): - LoopingExecutionContext.__init__(self, wrapped, parent) + def __init__(self, wrapped, parent=None, services=None): + LoopingExecutionContext.__init__(self, wrapped, parent=parent, services=services) WithStatistics.__init__(self, 'in', 'out', 'err') self.input = Input() @@ -115,9 +115,11 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): else: self.push(_resolve(input_bag, result)) + def is_error(bag): return isinstance(bag, ErrorBag) + def _resolve(input_bag, output): # NotModified means to send the input unmodified to output. if output is NOT_MODIFIED: diff --git a/bonobo/io/csv.py b/bonobo/io/csv.py index 54a3fe6..df84557 100644 --- a/bonobo/io/csv.py +++ b/bonobo/io/csv.py @@ -3,7 +3,7 @@ import csv from bonobo.config import Option from bonobo.config.processors import ContextProcessor, contextual from bonobo.util.objects import ValueHolder -from .file import FileReader, FileWriter, FileHandler +from .file import FileHandler, FileReader, FileWriter class CsvHandler(FileHandler): @@ -41,10 +41,10 @@ class CsvReader(CsvHandler, FileReader): skip = Option(int, default=0) @ContextProcessor - def csv_headers(self, context, file): + def csv_headers(self, context, fs, file): yield ValueHolder(self.headers) - def read(self, file, headers): + def read(self, fs, file, headers): reader = csv.reader(file, delimiter=self.delimiter, quotechar=self.quotechar) headers.value = headers.value or next(reader) field_count = len(headers.value) @@ -55,7 +55,7 @@ class CsvReader(CsvHandler, FileReader): for row in reader: if len(row) != field_count: - raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count, )) + raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count,)) yield dict(zip(headers.value, row)) @@ -63,12 +63,12 @@ class CsvReader(CsvHandler, FileReader): @contextual class CsvWriter(CsvHandler, FileWriter): @ContextProcessor - def writer(self, context, file, lineno): - writer = csv.writer(file, delimiter=self.delimiter, quotechar=self.quotechar) + def writer(self, context, fs, file, lineno): + writer = csv.writer(file, delimiter=self.delimiter, quotechar=self.quotechar, lineterminator=self.eol) headers = ValueHolder(list(self.headers) if self.headers else None) yield writer, headers - def write(self, file, lineno, writer, headers, row): + def write(self, fs, file, lineno, writer, headers, row): if not lineno.value: headers.value = headers.value or row.keys() writer.writerow(headers.value) diff --git a/bonobo/io/file.py b/bonobo/io/file.py index 25c4f84..49c750b 100644 --- a/bonobo/io/file.py +++ b/bonobo/io/file.py @@ -1,8 +1,6 @@ -from io import BytesIO - -from bonobo.config import Option -from bonobo.config.processors import ContextProcessor, contextual +from bonobo.config import Option, Service from bonobo.config.configurables import Configurable +from bonobo.config.processors import ContextProcessor, contextual from bonobo.util.objects import ValueHolder __all__ = [ @@ -13,30 +11,34 @@ __all__ = [ @contextual class FileHandler(Configurable): - """ - Abstract component factory for file-related components. - + """Abstract component factory for file-related components. + + Args: + path (str): which path to use within the provided filesystem. + eol (str): which character to use to separate lines. + mode (str): which mode to use when opening the file. + fs (str): service name to use for filesystem. """ - path = Option(str, required=True) - eol = Option(str, default='\n') - mode = Option(str) + path = Option(str, required=True) # type: str + eol = Option(str, default='\n') # type: str + mode = Option(str) # type: str + + fs = Service('fs') # type: str @ContextProcessor - def file(self, context): - if self.path.find('http://') == 0 or self.path.find('https://') == 0: - import requests - response = requests.get(self.path) - yield BytesIO(response.content) - else: - with self.open() as file: - yield file + def file(self, context, fs): + with self.open(fs) as file: + yield file - def open(self): - return open(self.path, self.mode) + def open(self, fs): + return fs.open(self.path, self.mode) class Reader(FileHandler): + """Abstract component factory for readers. + """ + def __call__(self, *args): yield from self.read(*args) @@ -45,6 +47,9 @@ class Reader(FileHandler): class Writer(FileHandler): + """Abstract component factory for writers. + """ + def __call__(self, *args): return self.write(*args) @@ -53,23 +58,18 @@ class Writer(FileHandler): class FileReader(Reader): - """ - Component factory for file-like readers. + """Component factory for file-like readers. On its own, it can be used to read a file and yield one row per line, trimming the "eol" character at the end if present. Extending it is usually the right way to create more specific file readers (like json, csv, etc.) - """ mode = Option(str, default='r') - def read(self, file): + def read(self, fs, file): """ Write a row on the next line of given file. Prefix is used for newlines. - - :param ctx: - :param row: """ for line in file: yield line.rstrip(self.eol) @@ -77,28 +77,22 @@ class FileReader(Reader): @contextual class FileWriter(Writer): - """ - Component factory for file or file-like writers. + """Component factory for file or file-like writers. On its own, it can be used to write in a file one line per row that comes into this component. Extending it is usually the right way to create more specific file writers (like json, csv, etc.) - """ mode = Option(str, default='w+') @ContextProcessor - def lineno(self, context, file): + def lineno(self, context, fs, file): lineno = ValueHolder(0, type=int) yield lineno - def write(self, file, lineno, row): + def write(self, fs, file, lineno, row): """ Write a row on the next line of opened file in context. - - :param file fp: - :param str row: - :param str prefix: """ self._write_line(file, (self.eol if lineno.value else '') + row) lineno.value += 1 diff --git a/bonobo/io/json.py b/bonobo/io/json.py index 50b84d5..1b9ab46 100644 --- a/bonobo/io/json.py +++ b/bonobo/io/json.py @@ -15,7 +15,7 @@ class JsonHandler: class JsonReader(JsonHandler, FileReader): loader = staticmethod(json.load) - def read(self, file): + def read(self, fs, file): for line in self.loader(file): yield line @@ -23,16 +23,16 @@ class JsonReader(JsonHandler, FileReader): @contextual class JsonWriter(JsonHandler, FileWriter): @ContextProcessor - def envelope(self, context, file, lineno): + def envelope(self, context, fs, file, lineno): file.write('[\n') yield file.write('\n]') - def write(self, file, lineno, row): + def write(self, fs, file, lineno, row): """ Write a json row on the next line of file pointed by ctx.file. :param ctx: :param row: """ - return super().write(file, lineno, json.dumps(row)) + return super().write(fs, file, lineno, json.dumps(row)) diff --git a/bonobo/strategies/__init__.py b/bonobo/strategies/__init__.py index f912c2a..1420da6 100644 --- a/bonobo/strategies/__init__.py +++ b/bonobo/strategies/__init__.py @@ -1,8 +1,42 @@ -from bonobo.strategies.executor import ThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy +from bonobo.strategies.executor import ProcessPoolExecutorStrategy, ThreadPoolExecutorStrategy from bonobo.strategies.naive import NaiveStrategy __all__ = [ - 'NaiveStrategy', - 'ProcessPoolExecutorStrategy', - 'ThreadPoolExecutorStrategy', + 'create_strategy', ] + +STRATEGIES = { + 'naive': NaiveStrategy, + 'processpool': ProcessPoolExecutorStrategy, + 'threadpool': ThreadPoolExecutorStrategy, +} + +DEFAULT_STRATEGY = 'threadpool' + + +def create_strategy(name=None): + """ + Create a strategy, or just returns it if it's already one. + + :param name: + :return: Strategy + """ + from bonobo.strategies.base import Strategy + import logging + + if isinstance(name, Strategy): + return name + + if name is None: + name = DEFAULT_STRATEGY + + logging.debug('Creating strategy {}...'.format(name)) + + try: + factory = STRATEGIES[name] + except KeyError as exc: + raise RuntimeError( + 'Invalid strategy {}. Available choices: {}.'.format(repr(name), ', '.join(sorted(STRATEGIES.keys()))) + ) from exc + + return factory() \ No newline at end of file diff --git a/bonobo/structs/__init__.py b/bonobo/structs/__init__.py index e69de29..bd9361e 100644 --- a/bonobo/structs/__init__.py +++ b/bonobo/structs/__init__.py @@ -0,0 +1,7 @@ +from bonobo.structs.bags import Bag +from bonobo.structs.graphs import Graph +from bonobo.structs.tokens import Token + +__all__ = [ + 'Bag', 'Graph', 'Token' +] diff --git a/bonobo/util/testing.py b/bonobo/util/testing.py index ca400cc..7cf3da0 100644 --- a/bonobo/util/testing.py +++ b/bonobo/util/testing.py @@ -4,6 +4,6 @@ from bonobo.execution.node import NodeExecutionContext class CapturingNodeExecutionContext(NodeExecutionContext): - def __init__(self, wrapped, parent): - super().__init__(wrapped, parent) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) self.send = MagicMock() diff --git a/docs/guide/index.rst b/docs/guide/index.rst index 23cff3c..02082aa 100644 --- a/docs/guide/index.rst +++ b/docs/guide/index.rst @@ -10,6 +10,7 @@ There are a few things that you should know while writing transformations graphs :maxdepth: 2 purity + services Third party integrations :::::::::::::::::::::::: diff --git a/docs/guide/services.rst b/docs/guide/services.rst index 709b9a3..6538488 100644 --- a/docs/guide/services.rst +++ b/docs/guide/services.rst @@ -1,21 +1,35 @@ Services and dependencies (draft implementation) ================================================ +:Status: Draft implementation +:Stability: Alpha +:Last-Modified: 27 apr 2017 + Most probably, you'll want to use external systems within your transformations. Those systems may include databases, apis (using http, for example), filesystems, etc. -For a start, including those services hardcoded in your transformations can do the job, but you'll pretty soon feel -limited, for two main reasons: +You can start by hardcoding those services. That does the job, at first. -* Hardcoded and tightly linked dependencies make your transformation atoms hard to test. +If you're going a little further than that, you'll feel limited, for a few reasons: + +* Hardcoded and tightly linked dependencies make your transformations hard to test, and hard to reuse. * Processing data on your laptop is great, but being able to do it on different systems (or stages), in different - environments, is more realistic. + environments, is more realistic? You probably want to contigure a different database on a staging environment, + preprod environment or production system. Maybe you have silimar systems for different clients and want to select + the system at runtime. Etc. Service injection ::::::::::::::::: -To solve this problem, we introduce a light dependency injection system that basically allows you to define named -dependencies in your transformations, and provide an implementation at runtime. +To solve this problem, we introduce a light dependency injection system. It allows to define named dependencies in +your transformations, and provide an implementation at runtime. + +Class-based transformations +--------------------------- + +To define a service dependency in a class-based transformation, use :class:`bonobo.config.Service`, a special +descriptor (and subclass of :class:`bonobo.config.Option`) that will hold the service names and act as a marker +for runtime resolution of service instances. Let's define such a transformation: @@ -24,7 +38,7 @@ Let's define such a transformation: from bonobo.config import Configurable, Service class JoinDatabaseCategories(Configurable): - database = Service(default='primary_sql_database') + database = Service('primary_sql_database') def __call__(self, database, row): return { @@ -35,28 +49,46 @@ Let's define such a transformation: This piece of code tells bonobo that your transformation expect a sercive called "primary_sql_database", that will be injected to your calls under the parameter name "database". +Function-based transformations +------------------------------ + +No implementation yet, but expect something similar to CBT API, maybe using a `@Service(...)` decorator. + +Execution +--------- + Let's see how to execute it: .. code-block:: python import bonobo - bonobo.run( - [...extract...], + graph = bonobo.graph( + *before, JoinDatabaseCategories(), - [...load...], - services={ - 'primary_sql_database': my_database_service, - } + *after, ) + + if __name__ == '__main__': + bonobo.run( + graph, + services={ + 'primary_sql_database': my_database_service, + } + ) + +A dictionary, or dictionary-like, "services" named argument can be passed to the :func:`bonobo.run` helper. The +"dictionary-like" part is the real keyword here. Bonobo is not a DIC library, and won't become one. So the implementation +provided is pretty basic, and feature-less. But you can use much more evolved libraries instead of the provided +stub, and as long as it works the same (a.k.a implements a dictionary-like interface), the system will use it. -Future -:::::: +Future and proposals +:::::::::::::::::::: This is the first proposed implementation and it will evolve, but looks a lot like how we used bonobo ancestor in production. -You can expect to see the following features pretty soon: +May or may not happen, depending on discussions. * Singleton or prototype based injection (to use spring terminology, see https://www.tutorialspoint.com/spring/spring_bean_scopes.htm), allowing smart factory usage and efficient sharing of @@ -64,11 +96,43 @@ You can expect to see the following features pretty soon: * Lazily resolved parameters, eventually overriden by command line or environment, so you can for example override the database DSN or target filesystem on command line (or with shell environment). * Pool based locks that ensure that only one (or n) transformations are using a given service at the same time. +* Simple config implementation, using a python file for config (ex: bonobo run ... --services=services_prod.py). +* Default configuration for services, using an optional callable (`def get_services(args): ...`). Maybe tie default + configuration to graph, but not really a fan because this is unrelated to graph logic. +* Default implementation for a service in a transformation or in the descriptor. Maybe not a good idea, because it + tends to push forward multiple instances of the same thing, but we maybe... + + A few ideas on how it can be implemented, from the user perspective. + + .. code-block:: python + + # using call + http = Service('http.client')(requests) + + # using more explicit call + http = Service('http.client').set_default_impl(requests) + + # using a decorator + @Service('http.client') + def http(self, services): + import requests + return requests + + # as a default in a subclass of Service + class HttpService(Service): + def get_default_impl(self, services): + import requests + return requests + + # ... then use it as another service + http = HttpService('http.client') + -This is under heavy development, let us know what you think (slack may be a good place for this). +This is under development, let us know what you think (slack may be a good place for this). +The basics already work, and you can try it. Read more ::::::::: -todo: example code. +* See https://github.com/hartym/bonobo-sqlalchemy/blob/work-in-progress/bonobo_sqlalchemy/writers.py#L19 for example usage (work in progress). diff --git a/docs/install.rst b/docs/install.rst index baf02b9..36c7024 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -1,8 +1,7 @@ Installation ============ -Install with pip -:::::::::::::::: +Bonobo is `available on PyPI `_, and it's the easiest solution to get started. .. code-block:: shell-session @@ -11,29 +10,61 @@ Install with pip Install from source ::::::::::::::::::: +If you want to install an unreleased version, you can use git urls with pip. This is useful when using bonobo as a +dependency of your code and you want to try a forked version of bonobo with your software. You can use the git+http +string in your `requirements.txt` file. However, the best option for development on bonobo directly is not this one, +but editable installs (see below). + .. code-block:: shell-session - $ pip install git+https://github.com/python-bonobo/bonobo.git@master#egg=bonobo + $ pip install git+https://github.com/python-bonobo/bonobo.git@0.2#egg=bonobo Editable install :::::::::::::::: -If you plan on making patches to Bonobo, you should install it as an "editable" package. - +If you plan on making patches to Bonobo, you should install it as an "editable" package, which is a really great pip feature. +Pip will clone your repository in a source directory and create a symlink for it in the site-package directory of your +python interpreter. .. code-block:: shell-session - $ pip install --editable git+https://github.com/python-bonobo/bonobo.git@master#egg=bonobo + $ pip install --editable git+https://github.com/python-bonobo/bonobo.git@0.2#egg=bonobo -Note: `-e` is the shorthand version of `--editable`. +.. note:: You can also use the `-e` flag instead of the long version. +If you can't find the "source" directory, try trunning this: + +.. code-block:: shell-session + + $ python -c "import bonobo; print(bonobo.__path__)" + +Another option is to have a "local" editable install, which means you create the clone by yourself and make an editable install +from the local clone. + +.. code-block:: shell-session + +   $ git clone git@github.com:python-bonobo/bonobo.git + $ cd bonobo + $ pip install --editable . + +You can develop on this clone, but you probably want to add your own repository if you want to push code back and make pull requests. +I usually name the git remote for the main bonobo repository "upstream", and my own repository "origin". + +.. code-block:: shell-session + + $ git remote rename origin upstream + $ git remote add origin git@github.com:hartym/bonobo.git + +Of course, replace my github username by the one you used to fork bonobo. You should be good to go! Windows support ::::::::::::::: -We had some people report that there are problems on the windows platform, mostly due to terminal features. We're trying -to look into that but we don't have good windows experience, no windows box and not enough energy to provide serious -support there. If you have experience in this domain and you're willing to help, you're more than welcome! +There are problems on the windows platform, mostly due to the fact bonobo was not developed by experienced windows users. + +We're trying to look into that but energy available to provide serious support on windows is very limited. +If you have experience in this domain and you're willing to help, you're more than welcome! + .. todo:: diff --git a/setup.py b/setup.py index f028a6e..ed9b4da 100644 --- a/setup.py +++ b/setup.py @@ -41,8 +41,8 @@ setup( description='Bonobo', license='Apache License, Version 2.0', install_requires=[ - 'colorama ==0.3.9', 'psutil ==5.2.2', 'requests ==2.13.0', - 'stevedore ==1.21.0' + 'colorama ==0.3.9', 'fs ==2.0.3', 'psutil ==5.2.2', + 'requests ==2.13.0', 'stevedore ==1.21.0' ], version=version, long_description=read('README.rst'), diff --git a/tests/io/test_csv.py b/tests/io/test_csv.py index 0e09a34..06f3d40 100644 --- a/tests/io/test_csv.py +++ b/tests/io/test_csv.py @@ -1,15 +1,16 @@ import pytest -from bonobo import Bag, CsvReader, CsvWriter +from bonobo import Bag, CsvReader, CsvWriter, open_fs from bonobo.constants import BEGIN, END from bonobo.execution.node import NodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext def test_write_csv_to_file(tmpdir): - file = tmpdir.join('output.json') - writer = CsvWriter(path=str(file)) - context = NodeExecutionContext(writer, None) + fs, filename = open_fs(tmpdir), 'output.csv' + + writer = CsvWriter(path=filename) + context = NodeExecutionContext(writer, services={'fs': fs}) context.recv(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END) @@ -18,19 +19,19 @@ def test_write_csv_to_file(tmpdir): context.step() context.stop() - assert file.read() == 'foo\nbar\nbaz\n' + assert fs.open(filename).read() == 'foo\nbar\nbaz\n' with pytest.raises(AttributeError): getattr(context, 'file') def test_read_csv_from_file(tmpdir): - file = tmpdir.join('input.csv') - file.write('a,b,c\na foo,b foo,c foo\na bar,b bar,c bar') + fs, filename = open_fs(tmpdir), 'input.csv' + fs.open(filename, 'w').write('a,b,c\na foo,b foo,c foo\na bar,b bar,c bar') - reader = CsvReader(path=str(file), delimiter=',') + reader = CsvReader(path=filename, delimiter=',') - context = CapturingNodeExecutionContext(reader, None) + context = CapturingNodeExecutionContext(reader, services={'fs': fs}) context.start() context.recv(BEGIN, Bag(), END) diff --git a/tests/io/test_file.py b/tests/io/test_file.py index 71f6785..75740e9 100644 --- a/tests/io/test_file.py +++ b/tests/io/test_file.py @@ -1,6 +1,6 @@ import pytest -from bonobo import Bag, FileReader, FileWriter +from bonobo import Bag, FileReader, FileWriter, open_fs from bonobo.constants import BEGIN, END from bonobo.execution.node import NodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext @@ -14,10 +14,10 @@ from bonobo.util.testing import CapturingNodeExecutionContext ] ) def test_file_writer_in_context(tmpdir, lines, output): - file = tmpdir.join('output.txt') + fs, filename = open_fs(tmpdir), 'output.txt' - writer = FileWriter(path=str(file)) - context = NodeExecutionContext(writer, None) + writer = FileWriter(path=filename) + context = NodeExecutionContext(writer, services={'fs': fs}) context.start() context.recv(BEGIN, *map(Bag, lines), END) @@ -25,25 +25,27 @@ def test_file_writer_in_context(tmpdir, lines, output): context.step() context.stop() - assert file.read() == output + assert fs.open(filename).read() == output def test_file_writer_out_of_context(tmpdir): - file = tmpdir.join('output.txt') - writer = FileWriter(path=str(file)) + fs, filename = open_fs(tmpdir), 'output.txt' - with writer.open() as fp: + writer = FileWriter(path=filename) + + with writer.open(fs) as fp: fp.write('Yosh!') - assert file.read() == 'Yosh!' + assert fs.open(filename).read() == 'Yosh!' def test_file_reader_in_context(tmpdir): - file = tmpdir.join('input.txt') - file.write('Hello\nWorld\n') + fs, filename = open_fs(tmpdir), 'input.txt' - reader = FileReader(path=str(file)) - context = CapturingNodeExecutionContext(reader, None) + fs.open(filename, 'w').write('Hello\nWorld\n') + + reader = FileReader(path=filename) + context = CapturingNodeExecutionContext(reader, services={'fs': fs}) context.start() context.recv(BEGIN, Bag(), END) diff --git a/tests/io/test_json.py b/tests/io/test_json.py index 1c8c124..bc10d57 100644 --- a/tests/io/test_json.py +++ b/tests/io/test_json.py @@ -1,22 +1,23 @@ import pytest -from bonobo import Bag, JsonReader, JsonWriter +from bonobo import Bag, JsonReader, JsonWriter, open_fs from bonobo.constants import BEGIN, END from bonobo.execution.node import NodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext def test_write_json_to_file(tmpdir): - file = tmpdir.join('output.json') - writer = JsonWriter(path=str(file)) - context = NodeExecutionContext(writer, None) + fs, filename = open_fs(tmpdir), 'output.json' + + writer = JsonWriter(path=filename) + context = NodeExecutionContext(writer, services={'fs': fs}) context.start() context.recv(BEGIN, Bag({'foo': 'bar'}), END) context.step() context.stop() - assert file.read() == '[\n{"foo": "bar"}\n]' + assert fs.open(filename).read() == '[\n{"foo": "bar"}\n]' with pytest.raises(AttributeError): getattr(context, 'file') @@ -26,11 +27,11 @@ def test_write_json_to_file(tmpdir): def test_read_json_from_file(tmpdir): - file = tmpdir.join('input.json') - file.write('[{"x": "foo"},{"x": "bar"}]') - reader = JsonReader(path=str(file)) + fs, filename = open_fs(tmpdir), 'input.json' + fs.open(filename, 'w').write('[{"x": "foo"},{"x": "bar"}]') + reader = JsonReader(path=filename) - context = CapturingNodeExecutionContext(reader, None) + context = CapturingNodeExecutionContext(reader, services={'fs': fs}) context.start() context.recv(BEGIN, Bag(), END) diff --git a/tests/structs/test_graphs.py b/tests/structs/test_graphs.py index a17d433..c1c29c2 100644 --- a/tests/structs/test_graphs.py +++ b/tests/structs/test_graphs.py @@ -1,6 +1,7 @@ import pytest -from bonobo import Graph, BEGIN +from bonobo.constants import BEGIN +from bonobo.structs import Graph identity = lambda x: x diff --git a/tests/structs/test_tokens.py b/tests/structs/test_tokens.py index d576860..1ca2166 100644 --- a/tests/structs/test_tokens.py +++ b/tests/structs/test_tokens.py @@ -1,4 +1,4 @@ -from bonobo import Token +from bonobo.structs import Token def test_token_repr(): diff --git a/tests/test_execution.py b/tests/test_execution.py index e2fe1c2..921e8d9 100644 --- a/tests/test_execution.py +++ b/tests/test_execution.py @@ -1,7 +1,8 @@ -from bonobo import Graph, NaiveStrategy, Bag from bonobo.config.processors import contextual from bonobo.constants import BEGIN, END from bonobo.execution.graph import GraphExecutionContext +from bonobo.strategies import NaiveStrategy +from bonobo.structs import Bag, Graph def generate_integers(): @@ -9,7 +10,7 @@ def generate_integers(): def square(i: int) -> int: - return i**2 + return i ** 2 @contextual diff --git a/tests/test_publicapi.py b/tests/test_publicapi.py new file mode 100644 index 0000000..888e462 --- /dev/null +++ b/tests/test_publicapi.py @@ -0,0 +1,17 @@ +import types + + +def test_wildcard_import(): + bonobo = __import__('bonobo') + assert bonobo.__version__ + + for name in dir(bonobo): + # ignore attributes starting by underscores + if name.startswith('_'): + continue + attr = getattr(bonobo, name) + if isinstance(attr, types.ModuleType): + continue + + assert name in bonobo.__all__ +