formating, better consistency in readers, ability to read files from http (fast and dirty).

This commit is contained in:
Romain Dorgueil
2017-02-12 08:10:22 +01:00
parent 9dab39a474
commit b035bdea32
33 changed files with 203 additions and 158 deletions

View File

@ -4,7 +4,6 @@
# transformations using a simple directed graph of python callables.
#
# Licensed under Apache License 2.0, read the LICENSE file in the root of the source tree.
"""Bonobo data-processing toolkit main module."""
import sys
@ -62,8 +61,9 @@ def create_strategy(name=None):
try:
factory = STRATEGIES[name]
except KeyError as exc:
raise RuntimeError('Invalid strategy {}. Available choices: {}.'.format(repr(name), ', '.join(
sorted(STRATEGIES.keys())))) from exc
raise RuntimeError(
'Invalid strategy {}. Available choices: {}.'.format(repr(name), ', '.join(sorted(STRATEGIES.keys())))
) from exc
return factory()

View File

@ -11,6 +11,7 @@ def entrypoint(args=None):
subparsers.required = True
commands = {}
def register_extension(ext, commands=commands):
try:
parser = subparsers.add_parser(ext.name)
@ -18,7 +19,9 @@ def entrypoint(args=None):
except Exception:
logging.exception('Error while loading command {}.'.format(ext.name))
mgr = ExtensionManager(namespace='bonobo.commands', )
mgr = ExtensionManager(
namespace='bonobo.commands',
)
mgr.map(register_extension)
args = parser.parse_args(args).__dict__

View File

@ -24,9 +24,10 @@ def execute(file, quiet=False):
graphs = dict((k, v) for k, v in context.items() if isinstance(v, Graph))
assert len(graphs) == 1, ('Having zero or more than one graph definition in one file is unsupported for now, '
'but it is something that will be implemented in the future.\n\nExpected: 1, got: {}.').format(
len(graphs))
assert len(graphs) == 1, (
'Having zero or more than one graph definition in one file is unsupported for now, '
'but it is something that will be implemented in the future.\n\nExpected: 1, got: {}.'
).format(len(graphs))
name, graph = list(graphs.items())[0]

View File

@ -3,6 +3,7 @@ __all__ = [
'Option',
]
class Option:
def __init__(self, type=None, *, required=False, default=None):
self.name = None
@ -11,7 +12,9 @@ class Option:
self.default = default
def __get__(self, inst, typ):
return inst.__options_values__.get(self.name, self.default)
if not self.name in inst.__options_values__:
inst.__options_values__[self.name] = self.default() if callable(self.default) else self.default
return inst.__options_values__[self.name]
def __set__(self, inst, value):
inst.__options_values__[self.name] = self.type(value) if self.type else value

View File

@ -101,8 +101,8 @@ class LoopingExecutionContext(Wrapper):
self._started, self._stopped, self._context, self._stack = False, False, None, []
def start(self):
assert self.state == (False, False), ('{}.start() can only be called on a new node.'
).format(type(self).__name__)
assert self.state == (False,
False), ('{}.start() can only be called on a new node.').format(type(self).__name__)
assert self._context is None
self._started = True
@ -175,6 +175,7 @@ class PluginExecutionContext(LoopingExecutionContext):
LoopingExecutionContext.__init__(self, wrapped, parent)
def shutdown(self):
self.wrapped.finalize(self)
self.alive = False
def step(self):

View File

@ -21,10 +21,7 @@ class Bag:
def args(self):
if self._parent is None:
return self._args
return (
*self._parent.args,
*self._args,
)
return (*self._parent.args, *self._args, )
@property
def kwargs(self):

View File

@ -39,9 +39,11 @@ class ExecutorStrategy(Strategy):
futures.append(executor.submit(_runner))
for node_context in context.nodes:
def _runner(node_context=node_context):
node_context.start()
node_context.loop()
futures.append(executor.submit(_runner))
while context.alive:

View File

@ -1,17 +1,16 @@
from os.path import dirname, realpath, join
from bonobo import console_run
from bonobo.ext.opendatasoft import from_opendatasoft_api
from bonobo.io.file import FileWriter
import bonobo
from bonobo.ext.opendatasoft import OpenDataSoftAPI
OUTPUT_FILENAME = realpath(join(dirname(__file__), 'coffeeshops.txt'))
console_run(
from_opendatasoft_api(
'liste-des-cafes-a-un-euro', netloc='opendata.paris.fr'
),
graph = bonobo.Graph(
OpenDataSoftAPI(dataset='liste-des-cafes-a-un-euro', netloc='opendata.paris.fr'),
lambda row: '{nom_du_cafe}, {adresse}, {arrondissement} Paris, France'.format(**row),
FileWriter(OUTPUT_FILENAME),
bonobo.FileWriter(path=OUTPUT_FILENAME),
)
print('Import done, read {} for results.'.format(OUTPUT_FILENAME))
if __name__ == '__main__':
bonobo.run(graph)
print('Import done, read {} for results.'.format(OUTPUT_FILENAME))

View File

@ -4,7 +4,7 @@ import os
from blessings import Terminal
from bonobo import Tee, JsonWriter, Graph, get_examples_path
from bonobo.ext.opendatasoft import from_opendatasoft_api
from bonobo.ext.opendatasoft import OpenDataSoftAPI
try:
import pycountry
@ -44,8 +44,7 @@ def display(row):
address = list(
filter(
None, (
' '.join(filter(None, (row.get('postal_code', None), row.get('city', None)))),
row.get('county', None),
' '.join(filter(None, (row.get('postal_code', None), row.get('city', None)))), row.get('county', None),
row.get('country'),
)
)
@ -58,9 +57,7 @@ def display(row):
graph = Graph(
from_opendatasoft_api(
API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris'
),
OpenDataSoftAPI(dataset=API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris'),
normalize,
filter_france,
Tee(display),

View File

@ -0,0 +1,30 @@
root:x:0:0:root:/root:/bin/bash
daemon:x:105:1:daemon:/usr/sbin:/usr/sbin/nologin
bin:x:2:2:bin:/bin:/usr/sbin/nologin
sys:x:3:3:sys:/dev:/usr/sbin/nologin
sync:x:4:65534:sync:/bin:/bin/sync
games:x:5:60:games:/usr/games:/usr/sbin/nologin
man:x:6:12:man:/var/cache/man:/usr/sbin/nologin
lp:x:7:7:lp:/var/spool/lpd:/usr/sbin/nologin
mail:x:0:8:mail:/var/mail:/usr/sbin/nologin
news:x:9:9:news:/var/spool/news:/usr/sbin/nologin
uucp:x:10:10:uucp:/var/spool/uucp:/usr/sbin/nologin
proxy:x:13:13:proxy:/bin:/usr/sbin/nologin
www-data:x:33:33:www-data:/var/www:/usr/sbin/nologin
backup:x:33:34:backup:/var/backups:/usr/sbin/nologin
list:x:38:38:Mailing List Manager:/var/list:/usr/sbin/nologin
irc:x:39:39:ircd:/var/run/ircd:/usr/sbin/nologin
gnats:x:41:41:Gnats Bug-Reporting System (admin):/var/lib/gnats:/usr/sbin/nologin
nobody:x:65534:65534:nobody:/nonexistent:/usr/sbin/nologin
systemd-timesync:x:33:103:systemd Time Synchronization,,,:/run/systemd:/bin/false
systemd-network:x:101:104:systemd Network Management,,,:/run/systemd/netif:/bin/false
systemd-resolve:x:102:105:systemd Resolver,,,:/run/systemd/resolve:/bin/false
systemd-bus-proxy:x:103:106:systemd Bus Proxy,,,:/run/systemd:/bin/false
sshd:x:104:65534::/var/run/sshd:/usr/sbin/nologin
ntp:x:105:110::/home/ntp:/bin/false
postfix:x:105:112::/var/spool/postfix:/bin/false
messagebus:x:107:114::/var/run/dbus:/bin/false
debian-security-support:x:108:115:Debian security support check,,,:/var/lib/debian-security-support:/bin/false
snmp:x:109:116::/var/lib/snmp:/usr/sbin/nologin
postgres:x:105:117:PostgreSQL administrator,,,:/var/lib/postgresql:/bin/bash
redis:x:111:118::/var/lib/redis:/bin/false

View File

@ -0,0 +1,8 @@
import bonobo as bb
url = 'https://data.toulouse-metropole.fr/explore/dataset/theatres-et-salles-de-spectacles/download?format=json&timezone=Europe/Berlin&use_labels_for_header=true'
graph = bb.Graph(bb.JsonReader(path=url), print)
if __name__ == '__main__':
bb.run(graph)

View File

@ -1,4 +1,4 @@
from bonobo import FileReader, Graph
from bonobo import FileReader, Graph, get_examples_path
def skip_comments(line):
@ -7,7 +7,7 @@ def skip_comments(line):
graph = Graph(
FileReader(path='/etc/passwd'),
FileReader(path=get_examples_path('datasets/passwd.txt')),
skip_comments,
lambda s: s.split(':'),
lambda l: l[0],

View File

@ -13,7 +13,6 @@ Example on how to use :class:`bonobo.Bag` instances to pass flexible args/kwargs
"""
from random import randint
from bonobo import Bag, Graph
@ -26,10 +25,7 @@ def extract():
def transform(topic: str):
return Bag.inherit(
title=topic.title(),
rand=randint(10, 99)
)
return Bag.inherit(title=topic.title(), rand=randint(10, 99))
def load(topic: str, title: str, rand: int):

View File

@ -35,11 +35,7 @@ def load(row: dict):
print(row)
graph = Graph(
extract,
transform,
load
)
graph = Graph(extract, transform, load)
if __name__ == '__main__':
from bonobo import run

View File

@ -31,11 +31,7 @@ def load(s: str):
print(s)
graph = Graph(
extract,
transform,
load
)
graph = Graph(extract, transform, load)
if __name__ == '__main__':
from bonobo import run

View File

@ -1,3 +1,5 @@
from .plugin import ConsoleOutputPlugin
__all__ = ['ConsoleOutputPlugin', ]
__all__ = [
'ConsoleOutputPlugin',
]

View File

@ -80,30 +80,16 @@ class ConsoleOutputPlugin(Plugin):
if component.alive:
_line = ''.join(
(
t.black('({})'.format(i + 1)),
' ',
t.bold(t.white('+')),
' ',
component.name,
' ',
component.get_statistics_as_string(
debug=debug, profile=profile
),
' ',
t.black('({})'.format(i + 1)), ' ', t.bold(t.white('+')), ' ', component.name, ' ',
component.get_statistics_as_string(debug=debug, profile=profile), ' ',
)
)
else:
_line = t.black(
''.join(
(
'({})'.format(i + 1),
' - ',
component.name,
' ',
component.get_statistics_as_string(
debug=debug, profile=profile
),
' ',
'({})'.format(i + 1), ' - ', component.name, ' ',
component.get_statistics_as_string(debug=debug, profile=profile), ' ',
)
)
)

View File

@ -5,4 +5,6 @@ def _jupyter_nbextension_paths():
return [{'section': 'notebook', 'src': 'static', 'dest': 'bonobo-jupyter', 'require': 'bonobo-jupyter/extension'}]
__all__ = ['JupyterOutputPlugin', ]
__all__ = [
'JupyterOutputPlugin',
]

View File

@ -2,28 +2,39 @@ from urllib.parse import urlencode
import requests # todo: make this a service so we can substitute it ?
from bonobo.config import Configurable, Option
from bonobo.context import ContextProcessor, contextual
from bonobo.util.compat import deprecated
from bonobo.util.objects import ValueHolder
def from_opendatasoft_api(
dataset=None,
endpoint='{scheme}://{netloc}{path}',
scheme='https',
netloc='data.opendatasoft.com',
path='/api/records/1.0/search/',
rows=100,
**kwargs
):
path = path if path.startswith('/') else '/' + path
params = (
('dataset', dataset),
('rows', rows),
) + tuple(sorted(kwargs.items()))
base_url = endpoint.format(scheme=scheme, netloc=netloc, path=path) + '?' + urlencode(params)
def _extract_ods():
nonlocal base_url, rows
start = 0
def path_str(path):
return path if path.startswith('/') else '/' + path
@contextual
class OpenDataSoftAPI(Configurable):
dataset = Option(str, required=True)
endpoint = Option(str, default='{scheme}://{netloc}{path}')
scheme = Option(str, default='https')
netloc = Option(str, default='data.opendatasoft.com')
path = Option(path_str, default='/api/records/1.0/search/')
rows = Option(int, default=100)
kwargs = Option(dict, default=dict)
@ContextProcessor
def compute_path(self, context):
params = (('dataset', self.dataset), ('rows', self.rows), ) + tuple(sorted(self.kwargs.items()))
yield self.endpoint.format(scheme=self.scheme, netloc=self.netloc, path=self.path) + '?' + urlencode(params)
@ContextProcessor
def start(self, context, base_url):
yield ValueHolder(0)
def __call__(self, base_url, start, *args, **kwargs):
while True:
resp = requests.get('{}&start={start}'.format(base_url, start=start))
url = '{}&start={start}'.format(base_url, start=start.value)
resp = requests.get(url)
records = resp.json().get('records', [])
if not len(records):
@ -32,7 +43,14 @@ def from_opendatasoft_api(
for row in records:
yield {**row.get('fields', {}), 'geometry': row.get('geometry', {})}
start += rows
start.value += self.rows
_extract_ods.__name__ = 'extract_' + dataset.replace('-', '_')
return _extract_ods
@deprecated
def from_opendatasoft_api(dataset, **kwargs):
return OpenDataSoftAPI(dataset=dataset, **kwargs)
__all__ = [
'OpenDataSoftAPI',
]

View File

@ -55,10 +55,7 @@ class CsvReader(CsvHandler, FileReader):
for row in reader:
if len(row) != field_count:
raise ValueError('Got a line with %d fields, expecting %d.' % (
len(row),
field_count,
))
raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count, ))
yield dict(zip(headers.value, row))

View File

@ -1,6 +1,9 @@
from io import BytesIO
from bonobo.config import Configurable, Option
from bonobo.context import ContextProcessor
from bonobo.context.processors import contextual
from bonobo.util.file import create_reader
from bonobo.util.objects import ValueHolder
__all__ = [
@ -22,8 +25,13 @@ class FileHandler(Configurable):
@ContextProcessor
def file(self, context):
with self.open() as file:
yield file
if self.path.find('http://') == 0 or self.path.find('https://') == 0:
import requests
response = requests.get(self.path)
yield BytesIO(response.content)
else:
with self.open() as file:
yield file
def open(self):
return open(self.path, self.mode)

View File

@ -3,7 +3,9 @@ import json
from bonobo.context import ContextProcessor, contextual
from .file import FileWriter, FileReader
__all__ = ['JsonWriter', ]
__all__ = [
'JsonWriter',
]
class JsonHandler:
@ -11,8 +13,10 @@ class JsonHandler:
class JsonReader(JsonHandler, FileReader):
loader = staticmethod(json.load)
def read(self, file):
for line in json.load(file):
for line in self.loader(file):
yield line

View File

@ -65,9 +65,8 @@ def PrettyPrint(title_keys=('title', 'name', 'id'), print_values=True, sort=True
if print_values:
for k in sorted(row) if sort else row:
print(
'{t.blue}{k}{t.normal} : {t.black}({tp}){t.normal} {v}{t.clear_eol}'.format(
k=k, v=repr(row[k]), t=term, tp=type(row[k]).__name__
)
'{t.blue}{k}{t.normal} : {t.black}({tp}){t.normal} {v}{t.clear_eol}'.
format(k=k, v=repr(row[k]), t=term, tp=type(row[k]).__name__)
)
yield NOT_MODIFIED

View File

@ -1,6 +1,8 @@
import functools
import struct
import sys
import warnings
def is_platform_little_endian():
@ -22,3 +24,20 @@ def is_platform_mac():
def is_platform_32bit():
return struct.calcsize("P") * 8 < 64
def deprecated(func):
"""This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emmitted
when the function is used."""
@functools.wraps(func)
def new_func(*args, **kwargs):
warnings.simplefilter('always', DeprecationWarning) # turn off filter
warnings.warn(
"Call to deprecated function {}.".format(func.__name__), category=DeprecationWarning, stacklevel=2
)
warnings.simplefilter('default', DeprecationWarning) # reset filter
return func(*args, **kwargs)
return new_func

View File

@ -19,4 +19,4 @@ class Wrapper:
class ValueHolder:
def __init__(self, value, *, type=None):
self.value = value
self.type = type
self.type = type