wip: filesystem as a service.
This commit is contained in:
2
Makefile
2
Makefile
@ -1,7 +1,7 @@
|
||||
# This file has been auto-generated.
|
||||
# All changes will be lost, see Projectfile.
|
||||
#
|
||||
# Updated at 2017-04-25 23:05:05.062813
|
||||
# Updated at 2017-04-27 15:10:12.746712
|
||||
|
||||
PYTHON ?= $(shell which python)
|
||||
PYTHON_BASENAME ?= $(shell basename $(PYTHON))
|
||||
|
||||
@ -22,6 +22,7 @@ enable_features = {
|
||||
|
||||
install_requires = [
|
||||
'colorama >=0.3,<0.4',
|
||||
'fs ==2.0.3',
|
||||
'psutil >=5.2,<5.3',
|
||||
'requests >=2.13,<2.14',
|
||||
'stevedore >=1.19,<1.20',
|
||||
|
||||
@ -7,113 +7,10 @@
|
||||
"""Bonobo data-processing toolkit main module."""
|
||||
|
||||
import sys
|
||||
import warnings
|
||||
|
||||
assert (sys.version_info >= (3, 5)), 'Python 3.5+ is required to use Bonobo.'
|
||||
from bonobo._api import *
|
||||
from bonobo._api import __all__
|
||||
|
||||
from ._version import __version__
|
||||
from .basics import __all__ as __all_basics__
|
||||
from .config import __all__ as __all_config__
|
||||
from .execution import __all__ as __all_execution__
|
||||
from .io import __all__ as __all_io__
|
||||
from .strategies import __all__ as __all_strategies__
|
||||
|
||||
__all__ = __all_basics__ + __all_config__ + __all_execution__ + __all_io__ + __all_strategies__ + [
|
||||
'Bag',
|
||||
'ErrorBag'
|
||||
'Graph',
|
||||
'Token',
|
||||
'__version__',
|
||||
'create_strategy',
|
||||
'get_examples_path',
|
||||
'run',
|
||||
]
|
||||
|
||||
from .basics import *
|
||||
from .config import *
|
||||
from .execution import *
|
||||
from .io import *
|
||||
from .strategies import *
|
||||
from .structs.bags import *
|
||||
from .structs.graphs import *
|
||||
from .structs.tokens import *
|
||||
|
||||
DEFAULT_STRATEGY = 'threadpool'
|
||||
|
||||
STRATEGIES = {
|
||||
'naive': NaiveStrategy,
|
||||
'processpool': ProcessPoolExecutorStrategy,
|
||||
'threadpool': ThreadPoolExecutorStrategy,
|
||||
}
|
||||
|
||||
|
||||
def get_examples_path(*pathsegments):
|
||||
import os
|
||||
import pathlib
|
||||
return str(pathlib.Path(os.path.dirname(__file__), 'examples', *pathsegments))
|
||||
|
||||
|
||||
def create_strategy(name=None):
|
||||
"""
|
||||
Create a strategy, or just returns it if it's already one.
|
||||
|
||||
:param name:
|
||||
:return: Strategy
|
||||
"""
|
||||
from bonobo.strategies.base import Strategy
|
||||
import logging
|
||||
|
||||
if isinstance(name, Strategy):
|
||||
return name
|
||||
|
||||
if name is None:
|
||||
name = DEFAULT_STRATEGY
|
||||
|
||||
logging.debug('Creating strategy {}...'.format(name))
|
||||
|
||||
try:
|
||||
factory = STRATEGIES[name]
|
||||
except KeyError as exc:
|
||||
raise RuntimeError(
|
||||
'Invalid strategy {}. Available choices: {}.'.format(repr(name), ', '.join(sorted(STRATEGIES.keys())))
|
||||
) from exc
|
||||
|
||||
return factory()
|
||||
|
||||
|
||||
def _is_interactive_console():
|
||||
import sys
|
||||
return sys.stdout.isatty()
|
||||
|
||||
|
||||
def _is_jupyter_notebook():
|
||||
try:
|
||||
return get_ipython().__class__.__name__ == 'ZMQInteractiveShell'
|
||||
except NameError:
|
||||
return False
|
||||
|
||||
|
||||
def run(graph, *chain, strategy=None, plugins=None, services=None):
|
||||
if len(chain):
|
||||
warnings.warn('DEPRECATED. You should pass a Graph instance instead of a chain.')
|
||||
from bonobo import Graph
|
||||
graph = Graph(graph, *chain)
|
||||
|
||||
strategy = create_strategy(strategy)
|
||||
plugins = []
|
||||
|
||||
if _is_interactive_console():
|
||||
from bonobo.ext.console import ConsoleOutputPlugin
|
||||
if ConsoleOutputPlugin not in plugins:
|
||||
plugins.append(ConsoleOutputPlugin)
|
||||
|
||||
if _is_jupyter_notebook():
|
||||
from bonobo.ext.jupyter import JupyterOutputPlugin
|
||||
if JupyterOutputPlugin not in plugins:
|
||||
plugins.append(JupyterOutputPlugin)
|
||||
|
||||
return strategy.execute(graph, plugins=plugins, services=services)
|
||||
|
||||
|
||||
__all__ = __all__
|
||||
del sys
|
||||
del warnings
|
||||
|
||||
80
bonobo/_api.py
Normal file
80
bonobo/_api.py
Normal file
@ -0,0 +1,80 @@
|
||||
from bonobo._version import __version__
|
||||
|
||||
__all__ = [
|
||||
'__version__',
|
||||
]
|
||||
|
||||
from bonobo.structs import Bag, Graph
|
||||
|
||||
__all__ += ['Bag', 'Graph']
|
||||
|
||||
# Filesystem. This is a shortcut from the excellent filesystem2 library, that we make available there for convenience.
|
||||
from fs import open_fs as _open_fs
|
||||
open_fs = lambda url, *args, **kwargs: _open_fs(str(url), *args, **kwargs)
|
||||
__all__ += ['open_fs']
|
||||
|
||||
# Basic transformations.
|
||||
from bonobo.basics import *
|
||||
from bonobo.basics import __all__ as _all_basics
|
||||
|
||||
__all__ += _all_basics
|
||||
|
||||
# Execution strategies.
|
||||
from bonobo.strategies import create_strategy
|
||||
|
||||
__all__ += ['create_strategy']
|
||||
|
||||
|
||||
# Extract and loads from stdlib.
|
||||
from bonobo.io import *
|
||||
from bonobo.io import __all__ as _all_io
|
||||
|
||||
__all__ += _all_io
|
||||
|
||||
|
||||
# XXX This may be belonging to the bonobo.examples package.
|
||||
def get_examples_path(*pathsegments):
|
||||
import os
|
||||
import pathlib
|
||||
return str(pathlib.Path(os.path.dirname(__file__), 'examples', *pathsegments))
|
||||
|
||||
|
||||
__all__.append(get_examples_path.__name__)
|
||||
|
||||
|
||||
def _is_interactive_console():
|
||||
import sys
|
||||
return sys.stdout.isatty()
|
||||
|
||||
|
||||
def _is_jupyter_notebook():
|
||||
try:
|
||||
return get_ipython().__class__.__name__ == 'ZMQInteractiveShell'
|
||||
except NameError:
|
||||
return False
|
||||
|
||||
|
||||
# @api
|
||||
def run(graph, *chain, strategy=None, plugins=None, services=None):
|
||||
if len(chain):
|
||||
warnings.warn('DEPRECATED. You should pass a Graph instance instead of a chain.')
|
||||
from bonobo import Graph
|
||||
graph = Graph(graph, *chain)
|
||||
|
||||
strategy = create_strategy(strategy)
|
||||
plugins = []
|
||||
|
||||
if _is_interactive_console():
|
||||
from bonobo.ext.console import ConsoleOutputPlugin
|
||||
if ConsoleOutputPlugin not in plugins:
|
||||
plugins.append(ConsoleOutputPlugin)
|
||||
|
||||
if _is_jupyter_notebook():
|
||||
from bonobo.ext.jupyter import JupyterOutputPlugin
|
||||
if JupyterOutputPlugin not in plugins:
|
||||
plugins.append(JupyterOutputPlugin)
|
||||
|
||||
return strategy.execute(graph, plugins=plugins, services=services)
|
||||
|
||||
|
||||
__all__.append(run.__name__)
|
||||
@ -19,6 +19,7 @@ __all__ = [
|
||||
'noop',
|
||||
]
|
||||
|
||||
|
||||
def identity(x):
|
||||
return x
|
||||
|
||||
|
||||
@ -1,11 +1,12 @@
|
||||
from bonobo.config.configurables import Configurable
|
||||
from bonobo.config.options import Option
|
||||
from bonobo.config.services import Container, Service
|
||||
from bonobo.config.processors import ContextProcessor
|
||||
from bonobo.config.services import Container, Service
|
||||
|
||||
__all__ = [
|
||||
'Configurable',
|
||||
'Container',
|
||||
'ContextProcessor',
|
||||
'Option',
|
||||
'Service',
|
||||
]
|
||||
|
||||
@ -1,16 +1,21 @@
|
||||
from os.path import dirname, realpath, join
|
||||
|
||||
import bonobo
|
||||
from bonobo.ext.opendatasoft import OpenDataSoftAPI
|
||||
|
||||
OUTPUT_FILENAME = realpath(join(dirname(__file__), 'coffeeshops.txt'))
|
||||
filename = 'coffeeshops.txt'
|
||||
|
||||
graph = bonobo.Graph(
|
||||
OpenDataSoftAPI(dataset='liste-des-cafes-a-un-euro', netloc='opendata.paris.fr'),
|
||||
lambda row: '{nom_du_cafe}, {adresse}, {arrondissement} Paris, France'.format(**row),
|
||||
bonobo.FileWriter(path=OUTPUT_FILENAME),
|
||||
bonobo.FileWriter(path=filename),
|
||||
)
|
||||
|
||||
|
||||
def get_services():
|
||||
from os.path import dirname
|
||||
return {
|
||||
'fs': bonobo.open_fs(dirname(__file__))
|
||||
}
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
bonobo.run(graph)
|
||||
print('Import done, read {} for results.'.format(OUTPUT_FILENAME))
|
||||
bonobo.run(graph, services=get_services())
|
||||
|
||||
@ -1,11 +1,10 @@
|
||||
import json
|
||||
import os
|
||||
|
||||
from bonobo import JsonWriter, Graph, get_examples_path
|
||||
from bonobo.basics import Tee
|
||||
from bonobo.ext.opendatasoft import OpenDataSoftAPI
|
||||
|
||||
from colorama import Fore, Style
|
||||
|
||||
import bonobo
|
||||
from bonobo.ext.opendatasoft import OpenDataSoftAPI
|
||||
|
||||
try:
|
||||
import pycountry
|
||||
except ImportError as exc:
|
||||
@ -15,8 +14,6 @@ API_DATASET = 'fablabs-in-the-world'
|
||||
API_NETLOC = 'datanova.laposte.fr'
|
||||
ROWS = 100
|
||||
|
||||
__path__ = os.path.dirname(__file__)
|
||||
|
||||
|
||||
def _getlink(x):
|
||||
return x.get('url', None)
|
||||
@ -55,15 +52,21 @@ def display(row):
|
||||
print(' - {}source{}: {source}'.format(Fore.BLUE, Style.RESET_ALL, source='datanova/' + API_DATASET))
|
||||
|
||||
|
||||
graph = Graph(
|
||||
graph = bonobo.Graph(
|
||||
OpenDataSoftAPI(dataset=API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris'),
|
||||
normalize,
|
||||
filter_france,
|
||||
Tee(display),
|
||||
JsonWriter(path=get_examples_path('datasets/fablabs.txt')),
|
||||
bonobo.Tee(display),
|
||||
bonobo.JsonWriter(path='datasets/fablabs.txt'),
|
||||
)
|
||||
|
||||
if __name__ == '__main__':
|
||||
from bonobo import run
|
||||
|
||||
run(graph)
|
||||
def get_services():
|
||||
from os.path import dirname
|
||||
return {
|
||||
'fs': bonobo.open_fs(dirname(__file__))
|
||||
}
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
bonobo.run(graph, services=get_services())
|
||||
|
||||
@ -1,11 +1,11 @@
|
||||
from bonobo import CsvReader, Graph, get_examples_path
|
||||
import bonobo
|
||||
|
||||
graph = Graph(
|
||||
CsvReader(path=get_examples_path('datasets/coffeeshops.txt')),
|
||||
from ._services import get_services
|
||||
|
||||
graph = bonobo.Graph(
|
||||
bonobo.CsvReader(path='datasets/coffeeshops.txt'),
|
||||
print,
|
||||
)
|
||||
|
||||
if __name__ == '__main__':
|
||||
import bonobo
|
||||
|
||||
bonobo.run(graph)
|
||||
bonobo.run(graph, services=get_services())
|
||||
|
||||
@ -1,8 +1,13 @@
|
||||
import bonobo as bb
|
||||
import bonobo
|
||||
|
||||
from ._services import get_services
|
||||
|
||||
url = 'https://data.toulouse-metropole.fr/explore/dataset/theatres-et-salles-de-spectacles/download?format=json&timezone=Europe/Berlin&use_labels_for_header=true'
|
||||
|
||||
graph = bb.Graph(bb.JsonReader(path=url), print)
|
||||
graph = bonobo.Graph(
|
||||
bonobo.JsonReader(path=url),
|
||||
print
|
||||
)
|
||||
|
||||
if __name__ == '__main__':
|
||||
bb.run(graph)
|
||||
bonobo.run(graph)
|
||||
|
||||
@ -1,9 +1,3 @@
|
||||
from bonobo.execution.graph import GraphExecutionContext, NodeExecutionContext, PluginExecutionContext
|
||||
|
||||
__all__ = [
|
||||
'GraphExecutionContext',
|
||||
'NodeExecutionContext',
|
||||
'PluginExecutionContext',
|
||||
]
|
||||
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@ import sys
|
||||
import traceback
|
||||
from time import sleep
|
||||
|
||||
from bonobo.config import Container
|
||||
from bonobo.config.processors import resolve_processors
|
||||
from bonobo.util.iterators import ensure_tuple
|
||||
from bonobo.util.objects import Wrapper
|
||||
@ -23,9 +24,17 @@ class LoopingExecutionContext(Wrapper):
|
||||
def stopped(self):
|
||||
return self._stopped
|
||||
|
||||
def __init__(self, wrapped, parent):
|
||||
def __init__(self, wrapped, parent, services=None):
|
||||
super().__init__(wrapped)
|
||||
self.parent = parent
|
||||
if services:
|
||||
if parent:
|
||||
raise RuntimeError(
|
||||
'Having services defined both in GraphExecutionContext and child NodeExecutionContext is not supported, for now.')
|
||||
self.services = Container(services) if services else Container()
|
||||
else:
|
||||
self.services = None
|
||||
|
||||
self._started, self._stopped, self._context, self._stack = False, False, None, []
|
||||
|
||||
def start(self):
|
||||
@ -34,7 +43,12 @@ class LoopingExecutionContext(Wrapper):
|
||||
assert self._context is None
|
||||
self._started = True
|
||||
try:
|
||||
self._context = self.parent.services.args_for(self.wrapped) if self.parent else ()
|
||||
if self.parent:
|
||||
self._context = self.parent.services.args_for(self.wrapped)
|
||||
elif self.services:
|
||||
self._context = self.services.args_for(self.wrapped)
|
||||
else:
|
||||
self._context = ()
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
self.handle_error(exc, traceback.format_exc())
|
||||
raise
|
||||
@ -102,4 +116,4 @@ class LoopingExecutionContext(Wrapper):
|
||||
sep='',
|
||||
file=sys.stderr,
|
||||
)
|
||||
print(trace)
|
||||
print(trace)
|
||||
|
||||
@ -2,12 +2,12 @@ import traceback
|
||||
from queue import Empty
|
||||
from time import sleep
|
||||
|
||||
from bonobo.structs.bags import Bag, ErrorBag
|
||||
from bonobo.constants import INHERIT_INPUT, NOT_MODIFIED
|
||||
from bonobo.core.inputs import Input
|
||||
from bonobo.core.statistics import WithStatistics
|
||||
from bonobo.errors import InactiveReadableError
|
||||
from bonobo.execution.base import LoopingExecutionContext
|
||||
from bonobo.structs.bags import Bag, ErrorBag
|
||||
from bonobo.util.iterators import iter_if_not_sequence
|
||||
|
||||
|
||||
@ -21,8 +21,8 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
|
||||
"""todo check if this is right, and where it is used"""
|
||||
return self.input.alive and self._started and not self._stopped
|
||||
|
||||
def __init__(self, wrapped, parent):
|
||||
LoopingExecutionContext.__init__(self, wrapped, parent)
|
||||
def __init__(self, wrapped, parent=None, services=None):
|
||||
LoopingExecutionContext.__init__(self, wrapped, parent=parent, services=services)
|
||||
WithStatistics.__init__(self, 'in', 'out', 'err')
|
||||
|
||||
self.input = Input()
|
||||
@ -115,9 +115,11 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
|
||||
else:
|
||||
self.push(_resolve(input_bag, result))
|
||||
|
||||
|
||||
def is_error(bag):
|
||||
return isinstance(bag, ErrorBag)
|
||||
|
||||
|
||||
def _resolve(input_bag, output):
|
||||
# NotModified means to send the input unmodified to output.
|
||||
if output is NOT_MODIFIED:
|
||||
|
||||
@ -3,7 +3,7 @@ import csv
|
||||
from bonobo.config import Option
|
||||
from bonobo.config.processors import ContextProcessor, contextual
|
||||
from bonobo.util.objects import ValueHolder
|
||||
from .file import FileReader, FileWriter, FileHandler
|
||||
from .file import FileHandler, FileReader, FileWriter
|
||||
|
||||
|
||||
class CsvHandler(FileHandler):
|
||||
@ -41,10 +41,10 @@ class CsvReader(CsvHandler, FileReader):
|
||||
skip = Option(int, default=0)
|
||||
|
||||
@ContextProcessor
|
||||
def csv_headers(self, context, file):
|
||||
def csv_headers(self, context, fs, file):
|
||||
yield ValueHolder(self.headers)
|
||||
|
||||
def read(self, file, headers):
|
||||
def read(self, fs, file, headers):
|
||||
reader = csv.reader(file, delimiter=self.delimiter, quotechar=self.quotechar)
|
||||
headers.value = headers.value or next(reader)
|
||||
field_count = len(headers.value)
|
||||
@ -55,7 +55,7 @@ class CsvReader(CsvHandler, FileReader):
|
||||
|
||||
for row in reader:
|
||||
if len(row) != field_count:
|
||||
raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count, ))
|
||||
raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count,))
|
||||
|
||||
yield dict(zip(headers.value, row))
|
||||
|
||||
@ -63,12 +63,12 @@ class CsvReader(CsvHandler, FileReader):
|
||||
@contextual
|
||||
class CsvWriter(CsvHandler, FileWriter):
|
||||
@ContextProcessor
|
||||
def writer(self, context, file, lineno):
|
||||
writer = csv.writer(file, delimiter=self.delimiter, quotechar=self.quotechar)
|
||||
def writer(self, context, fs, file, lineno):
|
||||
writer = csv.writer(file, delimiter=self.delimiter, quotechar=self.quotechar, lineterminator=self.eol)
|
||||
headers = ValueHolder(list(self.headers) if self.headers else None)
|
||||
yield writer, headers
|
||||
|
||||
def write(self, file, lineno, writer, headers, row):
|
||||
def write(self, fs, file, lineno, writer, headers, row):
|
||||
if not lineno.value:
|
||||
headers.value = headers.value or row.keys()
|
||||
writer.writerow(headers.value)
|
||||
|
||||
@ -1,8 +1,6 @@
|
||||
from io import BytesIO
|
||||
|
||||
from bonobo.config import Option
|
||||
from bonobo.config.processors import ContextProcessor, contextual
|
||||
from bonobo.config import Option, Service
|
||||
from bonobo.config.configurables import Configurable
|
||||
from bonobo.config.processors import ContextProcessor, contextual
|
||||
from bonobo.util.objects import ValueHolder
|
||||
|
||||
__all__ = [
|
||||
@ -13,30 +11,34 @@ __all__ = [
|
||||
|
||||
@contextual
|
||||
class FileHandler(Configurable):
|
||||
"""
|
||||
Abstract component factory for file-related components.
|
||||
|
||||
"""Abstract component factory for file-related components.
|
||||
|
||||
Args:
|
||||
path (str): which path to use within the provided filesystem.
|
||||
eol (str): which character to use to separate lines.
|
||||
mode (str): which mode to use when opening the file.
|
||||
fs (str): service name to use for filesystem.
|
||||
"""
|
||||
|
||||
path = Option(str, required=True)
|
||||
eol = Option(str, default='\n')
|
||||
mode = Option(str)
|
||||
path = Option(str, required=True) # type: str
|
||||
eol = Option(str, default='\n') # type: str
|
||||
mode = Option(str) # type: str
|
||||
|
||||
fs = Service('fs') # type: str
|
||||
|
||||
@ContextProcessor
|
||||
def file(self, context):
|
||||
if self.path.find('http://') == 0 or self.path.find('https://') == 0:
|
||||
import requests
|
||||
response = requests.get(self.path)
|
||||
yield BytesIO(response.content)
|
||||
else:
|
||||
with self.open() as file:
|
||||
yield file
|
||||
def file(self, context, fs):
|
||||
with self.open(fs) as file:
|
||||
yield file
|
||||
|
||||
def open(self):
|
||||
return open(self.path, self.mode)
|
||||
def open(self, fs):
|
||||
return fs.open(self.path, self.mode)
|
||||
|
||||
|
||||
class Reader(FileHandler):
|
||||
"""Abstract component factory for readers.
|
||||
"""
|
||||
|
||||
def __call__(self, *args):
|
||||
yield from self.read(*args)
|
||||
|
||||
@ -45,6 +47,9 @@ class Reader(FileHandler):
|
||||
|
||||
|
||||
class Writer(FileHandler):
|
||||
"""Abstract component factory for writers.
|
||||
"""
|
||||
|
||||
def __call__(self, *args):
|
||||
return self.write(*args)
|
||||
|
||||
@ -53,23 +58,18 @@ class Writer(FileHandler):
|
||||
|
||||
|
||||
class FileReader(Reader):
|
||||
"""
|
||||
Component factory for file-like readers.
|
||||
"""Component factory for file-like readers.
|
||||
|
||||
On its own, it can be used to read a file and yield one row per line, trimming the "eol" character at the end if
|
||||
present. Extending it is usually the right way to create more specific file readers (like json, csv, etc.)
|
||||
|
||||
"""
|
||||
|
||||
mode = Option(str, default='r')
|
||||
|
||||
def read(self, file):
|
||||
def read(self, fs, file):
|
||||
"""
|
||||
Write a row on the next line of given file.
|
||||
Prefix is used for newlines.
|
||||
|
||||
:param ctx:
|
||||
:param row:
|
||||
"""
|
||||
for line in file:
|
||||
yield line.rstrip(self.eol)
|
||||
@ -77,28 +77,22 @@ class FileReader(Reader):
|
||||
|
||||
@contextual
|
||||
class FileWriter(Writer):
|
||||
"""
|
||||
Component factory for file or file-like writers.
|
||||
"""Component factory for file or file-like writers.
|
||||
|
||||
On its own, it can be used to write in a file one line per row that comes into this component. Extending it is
|
||||
usually the right way to create more specific file writers (like json, csv, etc.)
|
||||
|
||||
"""
|
||||
|
||||
mode = Option(str, default='w+')
|
||||
|
||||
@ContextProcessor
|
||||
def lineno(self, context, file):
|
||||
def lineno(self, context, fs, file):
|
||||
lineno = ValueHolder(0, type=int)
|
||||
yield lineno
|
||||
|
||||
def write(self, file, lineno, row):
|
||||
def write(self, fs, file, lineno, row):
|
||||
"""
|
||||
Write a row on the next line of opened file in context.
|
||||
|
||||
:param file fp:
|
||||
:param str row:
|
||||
:param str prefix:
|
||||
"""
|
||||
self._write_line(file, (self.eol if lineno.value else '') + row)
|
||||
lineno.value += 1
|
||||
|
||||
@ -15,7 +15,7 @@ class JsonHandler:
|
||||
class JsonReader(JsonHandler, FileReader):
|
||||
loader = staticmethod(json.load)
|
||||
|
||||
def read(self, file):
|
||||
def read(self, fs, file):
|
||||
for line in self.loader(file):
|
||||
yield line
|
||||
|
||||
@ -23,16 +23,16 @@ class JsonReader(JsonHandler, FileReader):
|
||||
@contextual
|
||||
class JsonWriter(JsonHandler, FileWriter):
|
||||
@ContextProcessor
|
||||
def envelope(self, context, file, lineno):
|
||||
def envelope(self, context, fs, file, lineno):
|
||||
file.write('[\n')
|
||||
yield
|
||||
file.write('\n]')
|
||||
|
||||
def write(self, file, lineno, row):
|
||||
def write(self, fs, file, lineno, row):
|
||||
"""
|
||||
Write a json row on the next line of file pointed by ctx.file.
|
||||
|
||||
:param ctx:
|
||||
:param row:
|
||||
"""
|
||||
return super().write(file, lineno, json.dumps(row))
|
||||
return super().write(fs, file, lineno, json.dumps(row))
|
||||
|
||||
@ -1,8 +1,42 @@
|
||||
from bonobo.strategies.executor import ThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy
|
||||
from bonobo.strategies.executor import ProcessPoolExecutorStrategy, ThreadPoolExecutorStrategy
|
||||
from bonobo.strategies.naive import NaiveStrategy
|
||||
|
||||
__all__ = [
|
||||
'NaiveStrategy',
|
||||
'ProcessPoolExecutorStrategy',
|
||||
'ThreadPoolExecutorStrategy',
|
||||
'create_strategy',
|
||||
]
|
||||
|
||||
STRATEGIES = {
|
||||
'naive': NaiveStrategy,
|
||||
'processpool': ProcessPoolExecutorStrategy,
|
||||
'threadpool': ThreadPoolExecutorStrategy,
|
||||
}
|
||||
|
||||
DEFAULT_STRATEGY = 'threadpool'
|
||||
|
||||
|
||||
def create_strategy(name=None):
|
||||
"""
|
||||
Create a strategy, or just returns it if it's already one.
|
||||
|
||||
:param name:
|
||||
:return: Strategy
|
||||
"""
|
||||
from bonobo.strategies.base import Strategy
|
||||
import logging
|
||||
|
||||
if isinstance(name, Strategy):
|
||||
return name
|
||||
|
||||
if name is None:
|
||||
name = DEFAULT_STRATEGY
|
||||
|
||||
logging.debug('Creating strategy {}...'.format(name))
|
||||
|
||||
try:
|
||||
factory = STRATEGIES[name]
|
||||
except KeyError as exc:
|
||||
raise RuntimeError(
|
||||
'Invalid strategy {}. Available choices: {}.'.format(repr(name), ', '.join(sorted(STRATEGIES.keys())))
|
||||
) from exc
|
||||
|
||||
return factory()
|
||||
@ -0,0 +1,7 @@
|
||||
from bonobo.structs.bags import Bag
|
||||
from bonobo.structs.graphs import Graph
|
||||
from bonobo.structs.tokens import Token
|
||||
|
||||
__all__ = [
|
||||
'Bag', 'Graph', 'Token'
|
||||
]
|
||||
|
||||
@ -4,6 +4,6 @@ from bonobo.execution.node import NodeExecutionContext
|
||||
|
||||
|
||||
class CapturingNodeExecutionContext(NodeExecutionContext):
|
||||
def __init__(self, wrapped, parent):
|
||||
super().__init__(wrapped, parent)
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.send = MagicMock()
|
||||
|
||||
4
setup.py
4
setup.py
@ -41,8 +41,8 @@ setup(
|
||||
description='Bonobo',
|
||||
license='Apache License, Version 2.0',
|
||||
install_requires=[
|
||||
'colorama >=0.3,<0.4', 'psutil >=5.2,<5.3', 'requests >=2.13,<2.14',
|
||||
'stevedore >=1.19,<1.20'
|
||||
'colorama >=0.3,<0.4', 'fs ==2.0.3', 'psutil >=5.2,<5.3',
|
||||
'requests >=2.13,<2.14', 'stevedore >=1.19,<1.20'
|
||||
],
|
||||
version=version,
|
||||
long_description=read('README.rst'),
|
||||
|
||||
@ -1,15 +1,16 @@
|
||||
import pytest
|
||||
|
||||
from bonobo import Bag, CsvReader, CsvWriter
|
||||
from bonobo import Bag, CsvReader, CsvWriter, open_fs
|
||||
from bonobo.constants import BEGIN, END
|
||||
from bonobo.execution.node import NodeExecutionContext
|
||||
from bonobo.util.testing import CapturingNodeExecutionContext
|
||||
|
||||
|
||||
def test_write_csv_to_file(tmpdir):
|
||||
file = tmpdir.join('output.json')
|
||||
writer = CsvWriter(path=str(file))
|
||||
context = NodeExecutionContext(writer, None)
|
||||
fs, filename = open_fs(tmpdir), 'output.csv'
|
||||
|
||||
writer = CsvWriter(path=filename)
|
||||
context = NodeExecutionContext(writer, services={'fs': fs})
|
||||
|
||||
context.recv(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END)
|
||||
|
||||
@ -18,19 +19,19 @@ def test_write_csv_to_file(tmpdir):
|
||||
context.step()
|
||||
context.stop()
|
||||
|
||||
assert file.read() == 'foo\nbar\nbaz\n'
|
||||
assert fs.open(filename).read() == 'foo\nbar\nbaz\n'
|
||||
|
||||
with pytest.raises(AttributeError):
|
||||
getattr(context, 'file')
|
||||
|
||||
|
||||
def test_read_csv_from_file(tmpdir):
|
||||
file = tmpdir.join('input.csv')
|
||||
file.write('a,b,c\na foo,b foo,c foo\na bar,b bar,c bar')
|
||||
fs, filename = open_fs(tmpdir), 'input.csv'
|
||||
fs.open(filename, 'w').write('a,b,c\na foo,b foo,c foo\na bar,b bar,c bar')
|
||||
|
||||
reader = CsvReader(path=str(file), delimiter=',')
|
||||
reader = CsvReader(path=filename, delimiter=',')
|
||||
|
||||
context = CapturingNodeExecutionContext(reader, None)
|
||||
context = CapturingNodeExecutionContext(reader, services={'fs': fs})
|
||||
|
||||
context.start()
|
||||
context.recv(BEGIN, Bag(), END)
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import pytest
|
||||
|
||||
from bonobo import Bag, FileReader, FileWriter
|
||||
from bonobo import Bag, FileReader, FileWriter, open_fs
|
||||
from bonobo.constants import BEGIN, END
|
||||
from bonobo.execution.node import NodeExecutionContext
|
||||
from bonobo.util.testing import CapturingNodeExecutionContext
|
||||
@ -14,10 +14,10 @@ from bonobo.util.testing import CapturingNodeExecutionContext
|
||||
]
|
||||
)
|
||||
def test_file_writer_in_context(tmpdir, lines, output):
|
||||
file = tmpdir.join('output.txt')
|
||||
fs, filename = open_fs(tmpdir), 'output.txt'
|
||||
|
||||
writer = FileWriter(path=str(file))
|
||||
context = NodeExecutionContext(writer, None)
|
||||
writer = FileWriter(path=filename)
|
||||
context = NodeExecutionContext(writer, services={'fs': fs})
|
||||
|
||||
context.start()
|
||||
context.recv(BEGIN, *map(Bag, lines), END)
|
||||
@ -25,25 +25,27 @@ def test_file_writer_in_context(tmpdir, lines, output):
|
||||
context.step()
|
||||
context.stop()
|
||||
|
||||
assert file.read() == output
|
||||
assert fs.open(filename).read() == output
|
||||
|
||||
|
||||
def test_file_writer_out_of_context(tmpdir):
|
||||
file = tmpdir.join('output.txt')
|
||||
writer = FileWriter(path=str(file))
|
||||
fs, filename = open_fs(tmpdir), 'output.txt'
|
||||
|
||||
with writer.open() as fp:
|
||||
writer = FileWriter(path=filename)
|
||||
|
||||
with writer.open(fs) as fp:
|
||||
fp.write('Yosh!')
|
||||
|
||||
assert file.read() == 'Yosh!'
|
||||
assert fs.open(filename).read() == 'Yosh!'
|
||||
|
||||
|
||||
def test_file_reader_in_context(tmpdir):
|
||||
file = tmpdir.join('input.txt')
|
||||
file.write('Hello\nWorld\n')
|
||||
fs, filename = open_fs(tmpdir), 'input.txt'
|
||||
|
||||
reader = FileReader(path=str(file))
|
||||
context = CapturingNodeExecutionContext(reader, None)
|
||||
fs.open(filename, 'w').write('Hello\nWorld\n')
|
||||
|
||||
reader = FileReader(path=filename)
|
||||
context = CapturingNodeExecutionContext(reader, services={'fs': fs})
|
||||
|
||||
context.start()
|
||||
context.recv(BEGIN, Bag(), END)
|
||||
|
||||
@ -1,22 +1,23 @@
|
||||
import pytest
|
||||
|
||||
from bonobo import Bag, JsonReader, JsonWriter
|
||||
from bonobo import Bag, JsonReader, JsonWriter, open_fs
|
||||
from bonobo.constants import BEGIN, END
|
||||
from bonobo.execution.node import NodeExecutionContext
|
||||
from bonobo.util.testing import CapturingNodeExecutionContext
|
||||
|
||||
|
||||
def test_write_json_to_file(tmpdir):
|
||||
file = tmpdir.join('output.json')
|
||||
writer = JsonWriter(path=str(file))
|
||||
context = NodeExecutionContext(writer, None)
|
||||
fs, filename = open_fs(tmpdir), 'output.json'
|
||||
|
||||
writer = JsonWriter(path=filename)
|
||||
context = NodeExecutionContext(writer, services={'fs': fs})
|
||||
|
||||
context.start()
|
||||
context.recv(BEGIN, Bag({'foo': 'bar'}), END)
|
||||
context.step()
|
||||
context.stop()
|
||||
|
||||
assert file.read() == '[\n{"foo": "bar"}\n]'
|
||||
assert fs.open(filename).read() == '[\n{"foo": "bar"}\n]'
|
||||
|
||||
with pytest.raises(AttributeError):
|
||||
getattr(context, 'file')
|
||||
@ -26,11 +27,11 @@ def test_write_json_to_file(tmpdir):
|
||||
|
||||
|
||||
def test_read_json_from_file(tmpdir):
|
||||
file = tmpdir.join('input.json')
|
||||
file.write('[{"x": "foo"},{"x": "bar"}]')
|
||||
reader = JsonReader(path=str(file))
|
||||
fs, filename = open_fs(tmpdir), 'input.json'
|
||||
fs.open(filename, 'w').write('[{"x": "foo"},{"x": "bar"}]')
|
||||
reader = JsonReader(path=filename)
|
||||
|
||||
context = CapturingNodeExecutionContext(reader, None)
|
||||
context = CapturingNodeExecutionContext(reader, services={'fs': fs})
|
||||
|
||||
context.start()
|
||||
context.recv(BEGIN, Bag(), END)
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import pytest
|
||||
|
||||
from bonobo import Graph, BEGIN
|
||||
from bonobo.constants import BEGIN
|
||||
from bonobo.structs import Graph
|
||||
|
||||
identity = lambda x: x
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
from bonobo import Token
|
||||
from bonobo.structs import Token
|
||||
|
||||
|
||||
def test_token_repr():
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
from bonobo import Graph, NaiveStrategy, Bag
|
||||
from bonobo.config.processors import contextual
|
||||
from bonobo.constants import BEGIN, END
|
||||
from bonobo.execution.graph import GraphExecutionContext
|
||||
from bonobo.strategies import NaiveStrategy
|
||||
from bonobo.structs import Bag, Graph
|
||||
|
||||
|
||||
def generate_integers():
|
||||
@ -9,7 +10,7 @@ def generate_integers():
|
||||
|
||||
|
||||
def square(i: int) -> int:
|
||||
return i**2
|
||||
return i ** 2
|
||||
|
||||
|
||||
@contextual
|
||||
|
||||
17
tests/test_publicapi.py
Normal file
17
tests/test_publicapi.py
Normal file
@ -0,0 +1,17 @@
|
||||
import types
|
||||
|
||||
|
||||
def test_wildcard_import():
|
||||
bonobo = __import__('bonobo')
|
||||
assert bonobo.__version__
|
||||
|
||||
for name in dir(bonobo):
|
||||
# ignore attributes starting by underscores
|
||||
if name.startswith('_'):
|
||||
continue
|
||||
attr = getattr(bonobo, name)
|
||||
if isinstance(attr, types.ModuleType):
|
||||
continue
|
||||
|
||||
assert name in bonobo.__all__
|
||||
|
||||
Reference in New Issue
Block a user