Attempt to refactor a bit of context, new count transform that counts the calls, new bonobo.structs package with basic data structures, removal of blessings.

This commit is contained in:
Romain Dorgueil
2017-04-24 23:44:29 +02:00
parent cff32c1612
commit ef2c881075
37 changed files with 405 additions and 145 deletions

View File

@ -10,7 +10,7 @@ install:
- pip install coveralls
script:
- make clean docs test
# - pip install pycountry
# - bin/run_all_examples.sh
- pip install pycountry
- bin/run_all_examples.sh
after_success:
- coveralls

View File

@ -1,7 +1,7 @@
# This file has been auto-generated.
# All changes will be lost, see Projectfile.
#
# Updated at 2017-04-24 21:37:09.094705
# Updated at 2017-04-24 21:40:05.495982
PYTHON ?= $(shell which python)
PYTHON_BASENAME ?= $(shell basename $(PYTHON))

View File

@ -21,12 +21,10 @@ enable_features = {
}
install_requires = [
'blessings >=1.6,<1.7',
'colorama >=0.3,<0.4',
'psutil >=5.0,<5.1',
'requests >=2.12,<2.13',
'stevedore >=1.19,<1.20',
'toolz >=0.8,<0.9',
]
extras_require = {

View File

@ -19,6 +19,10 @@ from .io import __all__ as __all_io__
from .util import __all__ as __all_util__
__all__ = __all_config__ + __all_context__ + __all_core__ + __all_io__ + __all_util__ + [
'Bag',
'ErrorBag'
'Graph',
'Token',
'__version__',
'create_strategy',
'get_examples_path',
@ -29,6 +33,9 @@ from .config import *
from .context import *
from .core import *
from .io import *
from .structs.bags import *
from .structs.graphs import *
from .structs.tokens import *
from .util import *
DEFAULT_STRATEGY = 'threadpool'

6
bonobo/constants.py Normal file
View File

@ -0,0 +1,6 @@
from bonobo.structs.tokens import Token
BEGIN = Token('Begin')
END = Token('End')
INHERIT_INPUT = Token('InheritInput')
NOT_MODIFIED = Token('NotModified')

View File

@ -1,15 +1,16 @@
import traceback
import sys
from functools import partial
from queue import Empty
from time import sleep
from bonobo.constants import BEGIN, END, NOT_MODIFIED, INHERIT_INPUT
from bonobo.context.processors import get_context_processors
from bonobo.core.bags import Bag, INHERIT_INPUT, ErrorBag
from bonobo.core.errors import InactiveReadableError
from bonobo.core.inputs import Input
from bonobo.core.statistics import WithStatistics
from bonobo.errors import InactiveReadableError
from bonobo.structs.bags import Bag, ErrorBag
from bonobo.util.objects import Wrapper
from bonobo.util.tokens import BEGIN, END, NOT_MODIFIED
class GraphExecutionContext:
@ -164,8 +165,15 @@ class LoopingExecutionContext(Wrapper):
:return: to hell
"""
from bonobo.util import terminal as term
print(term.bold(term.red('\U0001F4A3 {} in {}'.format(type(exc).__name__, self.wrapped))))
from colorama import Fore, Style
print(
Style.BRIGHT,
Fore.RED,
'\U0001F4A3 {} in {}'.format(type(exc).__name__, self.wrapped),
Style.RESET_ALL,
sep='',
file=sys.stderr,
)
print(trace)

View File

@ -1,3 +1,5 @@
from functools import partial
import types
_CONTEXT_PROCESSORS_ATTR = '__processors__'
@ -34,7 +36,19 @@ class ContextProcessor:
return self.func(*args, **kwargs)
def add_context_processor(cls_or_func, context_processor):
getattr(cls_or_func, _CONTEXT_PROCESSORS_ATTR).append(context_processor)
def contextual(cls_or_func):
"""
Make sure an element has the context processors collection.
:param cls_or_func:
"""
if not add_context_processor.__name__ in cls_or_func.__dict__:
setattr(cls_or_func, add_context_processor.__name__, partial(add_context_processor, cls_or_func))
if isinstance(cls_or_func, types.FunctionType):
try:
getattr(cls_or_func, _CONTEXT_PROCESSORS_ATTR)
@ -44,6 +58,7 @@ def contextual(cls_or_func):
if not _CONTEXT_PROCESSORS_ATTR in cls_or_func.__dict__:
setattr(cls_or_func, _CONTEXT_PROCESSORS_ATTR, [])
_processors = getattr(cls_or_func, _CONTEXT_PROCESSORS_ATTR)
for name, value in cls_or_func.__dict__.items():
if isinstance(value, ContextProcessor):

View File

@ -1,15 +1,10 @@
""" Core required libraries. """
from .bags import Bag, ErrorBag
from .graphs import Graph
from .services import inject, service
from .strategies.executor import ThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy
from .strategies.naive import NaiveStrategy
__all__ = [
'Bag',
'ErrorBag',
'Graph',
'NaiveStrategy',
'ProcessPoolExecutorStrategy',
'ThreadPoolExecutorStrategy',

View File

@ -17,9 +17,9 @@
from abc import ABCMeta, abstractmethod
from queue import Queue
from bonobo.core.errors import AbstractError, InactiveWritableError, InactiveReadableError
from bonobo.errors import AbstractError, InactiveWritableError, InactiveReadableError
from bonobo.constants import BEGIN, END
from bonobo.util import noop
from bonobo.util.tokens import BEGIN, END
BUFFER_SIZE = 8192
@ -111,3 +111,4 @@ class Input(Queue, Readable, Writable):
@property
def alive(self):
return self._runlevel > 0

View File

@ -1,12 +1,13 @@
import time
from threading import Thread
from concurrent.futures import Executor
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
from bonobo.constants import BEGIN, END
from bonobo.core.strategies.base import Strategy
from bonobo.util.tokens import BEGIN, END
from ..bags import Bag
from bonobo.structs.bags import Bag
class ExecutorStrategy(Strategy):

View File

@ -1,7 +1,6 @@
from bonobo.constants import BEGIN, END
from bonobo.core.strategies.base import Strategy
from bonobo.util.tokens import BEGIN, END
from ..bags import Bag
from bonobo.structs.bags import Bag
class NaiveStrategy(Strategy):

View File

@ -4,6 +4,7 @@ import os
from bonobo import Tee, JsonWriter, Graph, get_examples_path
from bonobo.ext.opendatasoft import OpenDataSoftAPI
from colorama import Fore, Style
try:
import pycountry
except ImportError as exc:
@ -36,7 +37,7 @@ def filter_france(row):
def display(row):
print(t.bold(row.get('name')))
print(Style.BRIGHT, row.get('name'), Style.RESET_ALL, sep='')
address = list(
filter(
@ -47,10 +48,10 @@ def display(row):
)
)
print(' - {}: {address}'.format(t.blue('address'), address=', '.join(address)))
print(' - {}: {links}'.format(t.blue('links'), links=', '.join(row['links'])))
print(' - {}: {geometry}'.format(t.blue('geometry'), **row))
print(' - {}: {source}'.format(t.blue('source'), source='datanova/' + API_DATASET))
print(' - {}address{}: {address}'.format(Fore.BLUE, Style.RESET_ALL, address=', '.join(address)))
print(' - {}links{}: {links}'.format(Fore.BLUE, Style.RESET_ALL, links=', '.join(row['links'])))
print(' - {}geometry{}: {geometry}'.format(Fore.BLUE, Style.RESET_ALL, **row))
print(' - {}source{}: {source}'.format(Fore.BLUE, Style.RESET_ALL, source='datanova/' + API_DATASET))
graph = Graph(

File diff suppressed because one or more lines are too long

View File

@ -1,12 +1,7 @@
import os
import pathlib
import bonobo
workdir = pathlib.Path(os.path.dirname(__file__))
graph = bonobo.Graph(
bonobo.FileReader(path=workdir.joinpath('datasets/coffeeshops.txt')),
bonobo.FileReader(path=bonobo.get_examples_path('datasets/coffeeshops.txt')),
print,
)

View File

@ -1,4 +1,4 @@
from bonobo import run
import bonobo
def generate_data():
@ -15,4 +15,11 @@ def output(x: str):
print(x)
run(generate_data, uppercase, output)
graph = bonobo.Graph(
generate_data,
uppercase,
output,
)
if __name__ == '__main__':
bonobo.run(graph)

View File

View File

@ -0,0 +1,6 @@
import bonobo
graph = bonobo.Graph(range(42), bonobo.count, print)
if __name__ == '__main__':
bonobo.run(graph)

View File

@ -17,12 +17,12 @@
import sys
from functools import lru_cache
import blessings
import os
import psutil
from colorama import Fore, Style
from bonobo.core.plugins import Plugin
from bonobo.util import terminal as t
from bonobo.util.term import CLEAR_EOL, MOVE_CURSOR_UP
@lru_cache(1)
@ -77,28 +77,49 @@ class ConsoleOutputPlugin(Plugin):
for i, component in enumerate(context):
if component.alive:
_line = ''.join(
(
t.black('({})'.format(i + 1)), ' ', t.bold(t.white('+')), ' ', component.name, ' ',
component.get_statistics_as_string(debug=debug, profile=profile), ' ',
)
)
_line = ''.join((
Fore.BLACK,
'({})'.format(i + 1),
Style.RESET_ALL,
' ',
Style.BRIGHT,
'+',
Style.RESET_ALL,
' ',
component.name,
' ',
component.get_statistics_as_string(debug=debug, profile=profile),
Style.RESET_ALL,
' ',
))
else:
_line = t.black(
''.join(
(
'({})'.format(i + 1), ' - ', component.name, ' ',
component.get_statistics_as_string(debug=debug, profile=profile), ' ',
)
)
)
print(prefix + _line + t.clear_eol)
_line = ''.join((
Fore.BLACK,
'({})'.format(i + 1),
' - ',
component.name,
' ',
component.get_statistics_as_string(debug=debug, profile=profile),
Style.RESET_ALL,
' ',
))
print(prefix + _line + '\033[0K')
if append:
# todo handle multiline
print(' `->', ' '.join('{0}: {1}'.format(t.bold(t.white(k)), v) for k, v in append), t.clear_eol)
print(''.join((
' `-> ',
' '.join(
'{}{}{}: {}'.format(
Style.BRIGHT,
k,
Style.RESET_ALL,
v
) for k, v in append),
CLEAR_EOL
)))
t_cnt += 1
if rewind:
print(t.clear_eol)
print(t.move_up * (t_cnt + 2))
print(CLEAR_EOL)
print(MOVE_CURSOR_UP(t_cnt + 2))

View File

@ -19,13 +19,14 @@ class OpenDataSoftAPI(Configurable):
scheme = Option(str, default='https')
netloc = Option(str, default='data.opendatasoft.com')
path = Option(path_str, default='/api/records/1.0/search/')
rows = Option(int, default=100)
rows = Option(int, default=500)
limit = Option(int, default=None)
timezone = Option(str, default='Europe/Paris')
kwargs = Option(dict, default=dict)
@ContextProcessor
def compute_path(self, context):
params = (('dataset', self.dataset), ('rows', self.rows), ('timezone', self.timezone)) + tuple(sorted(self.kwargs.items()))
params = (('dataset', self.dataset), ('timezone', self.timezone)) + tuple(sorted(self.kwargs.items()))
yield self.endpoint.format(scheme=self.scheme, netloc=self.netloc, path=self.path) + '?' + urlencode(params)
@ContextProcessor
@ -33,8 +34,8 @@ class OpenDataSoftAPI(Configurable):
yield ValueHolder(0)
def __call__(self, base_url, start, *args, **kwargs):
while True:
url = '{}&start={start}'.format(base_url, start=start.value)
while (not self.limit) or (self.limit > start):
url = '{}&start={start}&rows={rows}'.format(base_url, start=start.value, rows=self.rows if not self.limit else min(self.rows, self.limit-start))
resp = requests.get(url)
records = resp.json().get('records', [])
@ -42,7 +43,10 @@ class OpenDataSoftAPI(Configurable):
break
for row in records:
yield {**row.get('fields', {}), 'geometry': row.get('geometry', {})}
yield {
**row.get('fields', {}),
'geometry': row.get('geometry', {})
}
start.value += self.rows

View File

View File

@ -1,14 +1,12 @@
import itertools
from bonobo.util.tokens import Token
from bonobo.constants import INHERIT_INPUT
__all__ = [
'Bag',
'ErrorBag',
]
INHERIT_INPUT = Token('InheritInput')
class Bag:
def __init__(self, *args, _flags=None, _parent=None, **kwargs):

View File

@ -1,4 +1,4 @@
from bonobo.util.tokens import BEGIN
from bonobo.constants import BEGIN
class Graph:

View File

@ -6,9 +6,3 @@ class Token:
def __repr__(self):
return '<{}>'.format(self.__name__)
BEGIN = Token('Begin')
END = Token('End')
NOT_MODIFIED = Token('NotModified')

View File

@ -3,21 +3,22 @@
import functools
from pprint import pprint as _pprint
from .tokens import NOT_MODIFIED
from colorama import Fore, Style
import colorama as _colorama
_colorama.init()
import blessings as _blessings
terminal = _blessings.Terminal()
from bonobo.constants import NOT_MODIFIED
from bonobo.context.processors import contextual
from bonobo.structs.bags import Bag
from bonobo.util.objects import ValueHolder
from bonobo.util.term import CLEAR_EOL
__all__ = [
'Limit',
'NOT_MODIFIED',
'PrettyPrint',
'Tee',
'count',
'noop',
'pprint',
'terminal',
]
@ -34,7 +35,7 @@ def Limit(n=10):
if i <= n:
yield NOT_MODIFIED
_limit.__name__ = 'limit({})'.format(n)
_limit.__name__ = 'Limit({})'.format(n)
return _limit
@ -48,25 +49,52 @@ def Tee(f):
return wrapped
@contextual
def count(counter, *args, **kwargs):
counter += 1
@count.add_context_processor
def _count_counter(self, context):
counter = ValueHolder(0)
yield counter
context.send(Bag(counter.value))
pprint = Tee(_pprint)
def PrettyPrint(title_keys=('title', 'name', 'id'), print_values=True, sort=True):
def _pprint(*args, **kwargs):
global terminal
nonlocal title_keys, sort, print_values
row = args[0]
for key in title_keys:
if key in row:
print(term.bold(row.get(key)))
print(
Style.BRIGHT,
row.get(key),
Style.RESET_ALL,
sep=''
)
break
if print_values:
for k in sorted(row) if sort else row:
print(
'{t.blue}{k}{t.normal} : {t.black}({tp}){t.normal} {v}{t.clear_eol}'.
format(k=k, v=repr(row[k]), t=term, tp=type(row[k]).__name__)
'',
Fore.BLUE,
k,
Style.RESET_ALL,
' : ',
Fore.BLACK,
'(',
type(row[k]).__name__,
')',
Style.RESET_ALL,
' ',
repr(row[k]),
CLEAR_EOL,
)
yield NOT_MODIFIED
@ -76,41 +104,5 @@ def PrettyPrint(title_keys=('title', 'name', 'id'), print_values=True, sort=True
return _pprint
'''
Old code from rdc.etl
def writehr(self, label=None):
width = t.width or 80
if label:
label = str(label)
sys.stderr.write(t.black('·' * 4) + shade('{') + label + shade('}') + t.black('·' * (width - (6+len(label)) - 1)) + '\n')
else:
sys.stderr.write(t.black('·' * (width-1) + '\n'))
def writeln(self, s):
"""Output method."""
sys.stderr.write(self.format(s) + '\n')
def initialize(self):
self.lineno = 0
def transform(self, hash, channel=STDIN):
"""Actual transformation."""
self.lineno += 1
if not self.condition or self.condition(hash):
hash = hash.copy()
hash = hash if not isinstance(self.field_filter, collections.Callable) else hash.restrict(self.field_filter)
if self.clean:
hash = hash.restrict(lambda k: len(k) and k[0] != '_')
self.writehr(self.lineno)
self.writeln(hash)
self.writehr()
sys.stderr.write('\n')
yield hash
'''
def noop(*args, **kwargs): # pylint: disable=unused-argument
return NOT_MODIFIED

View File

@ -20,3 +20,81 @@ class ValueHolder:
def __init__(self, value, *, type=None):
self.value = value
self.type = type
def __lt__(self, other):
return self.value < other
def __le__(self, other):
return self.value <= other
def __eq__(self, other):
return self.value == other
def __ne__(self, other):
return self.value != other
def __gt__(self, other):
return self.value > other
def __ge__(self, other):
return self.value >= other
def __add__(self, other):
return self.value + other
def __radd__(self, other):
return other + self.value
def __iadd__(self, other):
self.value += other
def __sub__(self, other):
return self.value - other
def __rsub__(self, other):
return other - self.value
def __isub__(self, other):
self.value -= other
def __mul__(self, other):
return self.value * other
def __rmul__(self, other):
return other * self.value
def __imul__(self, other):
self.value *= other
def __matmul__(self, other):
return self.value @ other
def __rmatmul__(self, other):
return other @ self.value
def __imatmul__(self, other):
self.value @= other
def __truediv__(self, other):
return self.value / other
def __rtruediv__(self, other):
return other / self.value
def __itruediv__(self, other):
self.value /= other
"""
object.__matmul__(self, other)
object.__truediv__(self, other)
object.__floordiv__(self, other)
object.__mod__(self, other)
object.__divmod__(self, other)
object.__pow__(self, other[, modulo])
object.__lshift__(self, other)
object.__rshift__(self, other)
object.__and__(self, other)
object.__xor__(self, other)
object.__or__(self, other)
"""

2
bonobo/util/term.py Normal file
View File

@ -0,0 +1,2 @@
CLEAR_EOL = '\033[0K'
MOVE_CURSOR_UP = lambda n: '\033[{}A'.format(n)

View File

@ -6,9 +6,7 @@ dependencies:
- wheel=0.29.0=py35_0
- pip:
- psycopg2 >=2.6.1
- blessings >=1.6,<1.7
- colorama >=0.3,<0.4
- psutil >=5.0,<5.1
- requests >=2.12,<2.13
- stevedore >=1.19,<1.20
- toolz >=0.8,<0.9

View File

@ -41,8 +41,8 @@ setup(
description='Bonobo',
license='Apache License, Version 2.0',
install_requires=[
'blessings >=1.6,<1.7', 'colorama >=0.3,<0.4', 'psutil >=5.0,<5.1',
'requests >=2.12,<2.13', 'stevedore >=1.19,<1.20', 'toolz >=0.8,<0.9'
'colorama >=0.3,<0.4', 'psutil >=5.0,<5.1', 'requests >=2.12,<2.13',
'stevedore >=1.19,<1.20'
],
version=version,
long_description=read('README.rst'),

View File

@ -1,6 +1,6 @@
from bonobo import Graph, NaiveStrategy, Bag, contextual
from bonobo.constants import BEGIN, END
from bonobo.context.execution import GraphExecutionContext
from bonobo.util.tokens import BEGIN, END
def generate_integers():

View File

@ -18,9 +18,9 @@ from queue import Empty
import pytest
from bonobo.core.errors import InactiveWritableError, InactiveReadableError
from bonobo.constants import BEGIN, END
from bonobo.core.inputs import Input
from bonobo.util.tokens import BEGIN, END
from bonobo.errors import InactiveWritableError, InactiveReadableError
def test_input_runlevels():

View File

@ -1,9 +1,9 @@
import pytest
from bonobo import Bag, CsvReader, CsvWriter
from bonobo.constants import BEGIN, END
from bonobo.context.execution import NodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext
from bonobo.util.tokens import BEGIN, END
def test_write_csv_to_file(tmpdir):

View File

@ -3,7 +3,7 @@ import pytest
from bonobo import FileWriter, Bag, FileReader
from bonobo.context.execution import NodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext
from bonobo.util.tokens import BEGIN, END
from bonobo.constants import BEGIN, END
@pytest.mark.parametrize(

View File

@ -1,10 +1,9 @@
import pytest
from bonobo import Bag, JsonWriter, JsonReader
from bonobo.constants import BEGIN, END
from bonobo.context.execution import NodeExecutionContext
from bonobo.util.objects import ValueHolder
from bonobo.util.testing import CapturingNodeExecutionContext
from bonobo.util.tokens import BEGIN, END
def test_write_json_to_file(tmpdir):

View File

@ -1,7 +1,7 @@
from mock import Mock
from bonobo import Bag
from bonobo.core.bags import INHERIT_INPUT
from bonobo.constants import INHERIT_INPUT
args = ('foo', 'bar',)
kwargs = dict(acme='corp')

View File

@ -1,7 +1,6 @@
import pytest
from bonobo.core.graphs import Graph
from bonobo.util.tokens import BEGIN
from bonobo import Graph, BEGIN
identity = lambda x: x

View File

@ -1,4 +1,4 @@
from bonobo.util.tokens import Token
from bonobo import Token
def test_token_repr():