WIP: factory for transformations

This commit is contained in:
Romain Dorgueil
2017-05-21 20:42:54 +02:00
parent 88df694dc1
commit 3059f112f1
10 changed files with 261 additions and 17 deletions

View File

@ -48,10 +48,6 @@ def normalize(row):
return result return result
def filter_france(row):
if row.get('country') == 'France':
yield row
def display(row): def display(row):
print(Style.BRIGHT, row.get('name'), Style.RESET_ALL, sep='') print(Style.BRIGHT, row.get('name'), Style.RESET_ALL, sep='')
@ -73,15 +69,15 @@ def display(row):
print( print(
' - {}address{}: {address}'. ' - {}address{}: {address}'.
format(Fore.BLUE, Style.RESET_ALL, address=', '.join(address)) format(Fore.BLUE, Style.RESET_ALL, address=', '.join(address))
) )
print( print(
' - {}links{}: {links}'. ' - {}links{}: {links}'.
format(Fore.BLUE, Style.RESET_ALL, links=', '.join(row['links'])) format(Fore.BLUE, Style.RESET_ALL, links=', '.join(row['links']))
) )
print( print(
' - {}geometry{}: {geometry}'. ' - {}geometry{}: {geometry}'.
format(Fore.BLUE, Style.RESET_ALL, **row) format(Fore.BLUE, Style.RESET_ALL, **row)
) )
print( print(
' - {}source{}: {source}'.format( ' - {}source{}: {source}'.format(
@ -95,7 +91,7 @@ graph = bonobo.Graph(
dataset=API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris' dataset=API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris'
), ),
normalize, normalize,
filter_france, bonobo.Filter(filter=lambda row: row.get('country') == 'France'),
bonobo.Tee(display), bonobo.Tee(display),
bonobo.JsonWriter(path='fablabs.txt'), bonobo.JsonWriter(path='fablabs.txt'),
) )

View File

@ -0,0 +1,5 @@
from bonobo import get_examples_path, open_fs
def get_services():
return {'fs': open_fs(get_examples_path())}

View File

@ -0,0 +1,33 @@
from functools import partial
import itertools
import bonobo
from bonobo.commands.run import get_default_services
from bonobo.config import Configurable
from bonobo.nodes.factory import Factory
from bonobo.nodes.io.json import JsonDictReader
@Factory
def Normalize(self):
self[0].str().title()
self.move(0, 'title')
self.move(0, 'address')
class PrettyPrinter(Configurable):
def call(self, *args, **kwargs):
for i, (item, value) in enumerate(itertools.chain(enumerate(args), kwargs.items())):
print(' ' if i else '', item, '=', value)
graph = bonobo.Graph(
JsonDictReader('datasets/coffeeshops.json'),
Normalize(),
PrettyPrinter(),
)
if __name__ == '__main__':
bonobo.run(graph, services=get_default_services(__file__))

View File

@ -1,7 +0,0 @@
from . import bags, dicts, strings
__all__ = [
'bags',
'dicts',
'strings',
]

View File

@ -3,8 +3,7 @@ from pprint import pprint as _pprint
from colorama import Fore, Style from colorama import Fore, Style
from bonobo.config import Configurable, Option from bonobo.config import Configurable, ContextProcessor, Option
from bonobo.config.processors import ContextProcessor
from bonobo.structs.bags import Bag from bonobo.structs.bags import Bag
from bonobo.util.objects import ValueHolder from bonobo.util.objects import ValueHolder
from bonobo.util.term import CLEAR_EOL from bonobo.util.term import CLEAR_EOL
@ -74,6 +73,7 @@ pprint = Tee(_pprint)
def PrettyPrint(title_keys=('title', 'name', 'id'), print_values=True, sort=True): def PrettyPrint(title_keys=('title', 'name', 'id'), print_values=True, sort=True):
from bonobo.constants import NOT_MODIFIED from bonobo.constants import NOT_MODIFIED
from colorama import Fore, Style
def _pprint(*args, **kwargs): def _pprint(*args, **kwargs):
nonlocal title_keys, sort, print_values nonlocal title_keys, sort, print_values

208
bonobo/nodes/factory.py Normal file
View File

@ -0,0 +1,208 @@
import functools
from functools import partial
from bonobo import Bag
from bonobo.config import Configurable, Method
_isarg = lambda item: type(item) is int
_iskwarg = lambda item: type(item) is str
class Operation():
def __init__(self, item, callable):
self.item = item
self.callable = callable
def __repr__(self):
return '<operation {} on {}>'.format(self.callable.__name__, self.item)
def apply(self, *args, **kwargs):
if _isarg(self.item):
return (*args[0:self.item], self.callable(args[self.item]), *args[self.item + 1:]), kwargs
if _iskwarg(self.item):
return args, {**kwargs, self.item: self.callable(kwargs.get(self.item))}
raise RuntimeError('Houston, we have a problem...')
class FactoryOperation():
def __init__(self, factory, callable):
self.factory = factory
self.callable = callable
def __repr__(self):
return '<factory operation {}>'.format(self.callable.__name__)
def apply(self, *args, **kwargs):
return self.callable(*args, **kwargs)
CURSOR_TYPES = {}
def operation(mixed):
def decorator(m, ctype=mixed):
def lazy_operation(self, *args, **kwargs):
@functools.wraps(m)
def actual_operation(x):
return m(self, x, *args, **kwargs)
self.factory.operations.append(Operation(self.item, actual_operation))
return CURSOR_TYPES[ctype](self.factory, self.item) if ctype else self
return lazy_operation
return decorator if isinstance(mixed, str) else decorator(mixed, ctype=None)
def factory_operation(m):
def lazy_operation(self, *config):
@functools.wraps(m)
def actual_operation(*args, **kwargs):
return m(self, *config, *args, **kwargs)
self.operations.append(FactoryOperation(self, actual_operation))
return self
return lazy_operation
class Cursor():
_type = None
def __init__(self, factory, item):
self.factory = factory
self.item = item
@operation('dict')
def dict(self, x):
return x if isinstance(x, dict) else dict(x)
@operation('int')
def int(self):
pass
@operation('str')
def str(self, x):
return x if isinstance(x, str) else str(x)
@operation('list')
def list(self):
pass
@operation('tuple')
def tuple(self):
pass
def __getattr__(self, item):
"""
Fallback to type methods if they exist, for example StrCursor.upper will use str.upper if not overriden, etc.
:param item:
"""
if self._type and item in self._type.__dict__:
method = self._type.__dict__[item]
@operation
@functools.wraps(method)
def _operation(self, x, *args, **kwargs):
return method(x, *args, **kwargs)
setattr(self, item, partial(_operation, self))
return getattr(self, item)
raise AttributeError('Unknown operation {}.{}().'.format(type(self).__name__, item, ))
CURSOR_TYPES['default'] = Cursor
class DictCursor(Cursor):
_type = dict
@operation('default')
def get(self, x, path):
return x.get(path)
@operation
def map_keys(self, x, mapping):
return {mapping.get(k): v for k, v in x.items()}
CURSOR_TYPES['dict'] = DictCursor
class StringCursor(Cursor):
_type = str
CURSOR_TYPES['str'] = StringCursor
class Factory(Configurable):
setup = Method()
def __init__(self):
self.default_cursor_type = 'default'
self.operations = []
self.setup()
@factory_operation
def move(self, _from, _to, *args, **kwargs):
if _from == _to:
return args, kwargs
if _isarg(_from):
value = args[_from]
args = args[:_from] + args[_from + 1:]
elif _iskwarg(_from):
value = kwargs[_from]
kwargs = {k: v for k, v in kwargs if k != _from}
else:
raise RuntimeError('Houston, we have a problem...')
if _isarg(_to):
return (*args[:_to], value, *args[_to + 1:]), kwargs
elif _iskwarg(_to):
return args, {**kwargs, _to: value}
else:
raise RuntimeError('Houston, we have a problem...')
def __call__(self, *args, **kwargs):
# print('factory call on', args, kwargs)
for operation in self.operations:
args, kwargs = operation.apply(*args, **kwargs)
# print(' ... after', operation, 'got', args, kwargs)
return Bag(*args, **kwargs)
def __getitem__(self, item):
return CURSOR_TYPES[self.default_cursor_type](self, item)
if __name__ == '__main__':
f = Factory()
f[0].dict().map_keys({'foo': 'F00'})
f['foo'].str().upper()
print('operations:', f.operations)
print(f({'foo': 'bisou'}, foo='blah'))
'''
specs:
- rename keys of an input dict (in args, or kwargs) using a translation map.
f = Factory()
f[0]
f['xxx'] =
f[0].dict().get('foo.bar').move_to('foo.baz').apply(str.upper)
f[0].get('foo.*').items().map(str.lower)
f['foo'].keys_map({
'a': 'b'
})
'''

View File

@ -1,5 +1,7 @@
import json import json
from itertools import starmap
from bonobo.structs.bags import Bag
from bonobo.config.processors import ContextProcessor from bonobo.config.processors import ContextProcessor
from .file import FileWriter, FileReader from .file import FileWriter, FileReader
@ -21,6 +23,13 @@ class JsonReader(JsonHandler, FileReader):
yield line yield line
class JsonDictReader(JsonReader):
""" not api, don't use or expect breakage. """
def read(self, fs, file):
yield from starmap(Bag, self.loader(file).items())
class JsonWriter(JsonHandler, FileWriter): class JsonWriter(JsonHandler, FileWriter):
@ContextProcessor @ContextProcessor
def envelope(self, context, fs, file, lineno): def envelope(self, context, fs, file, lineno):