From e5483de3443c6991e21393b73b34cf27fbf24840 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Mon, 5 Jun 2017 11:38:11 +0200 Subject: [PATCH] [core] I/O formats allowing both arg0 formating and kwargs based. Starting with 0.4, kwargs based will be default (BC break here, but needed for the greater good). --- bonobo/examples/files/csv_handlers.py | 4 +- bonobo/examples/files/json_handlers.py | 7 +- bonobo/examples/files/pickle_handlers.py | 83 ++++++++++++------------ bonobo/nodes/io/csv.py | 36 ++-------- bonobo/nodes/io/file.py | 34 ++++++++-- bonobo/nodes/io/json.py | 2 +- bonobo/nodes/io/pickle.py | 2 +- bonobo/settings.py | 25 +++++++ 8 files changed, 111 insertions(+), 82 deletions(-) diff --git a/bonobo/examples/files/csv_handlers.py b/bonobo/examples/files/csv_handlers.py index a9094b6..a15444d 100644 --- a/bonobo/examples/files/csv_handlers.py +++ b/bonobo/examples/files/csv_handlers.py @@ -2,8 +2,8 @@ import bonobo from bonobo.commands.run import get_default_services graph = bonobo.Graph( - bonobo.CsvReader('datasets/coffeeshops.txt'), - print, + bonobo.CsvReader('datasets/coffeeshops.txt', headers=('item',)), + bonobo.PrettyPrinter(), ) if __name__ == '__main__': diff --git a/bonobo/examples/files/json_handlers.py b/bonobo/examples/files/json_handlers.py index 86fe6a0..27dc38e 100644 --- a/bonobo/examples/files/json_handlers.py +++ b/bonobo/examples/files/json_handlers.py @@ -1,15 +1,16 @@ import bonobo +from bonobo import Bag from bonobo.commands.run import get_default_services -def get_fields(row): - return row['fields'] +def get_fields(**row): + return Bag(**row['fields']) graph = bonobo.Graph( bonobo.JsonReader('datasets/theaters.json'), get_fields, - bonobo.PrettyPrint(title_keys=('eq_nom_equipement', )), + bonobo.PrettyPrinter(), ) if __name__ == '__main__': diff --git a/bonobo/examples/files/pickle_handlers.py b/bonobo/examples/files/pickle_handlers.py index e6f3dcc..6863076 100644 --- a/bonobo/examples/files/pickle_handlers.py +++ b/bonobo/examples/files/pickle_handlers.py @@ -1,10 +1,38 @@ +''' +This example shows how a different file system service can be injected +into a transformation (as compressing pickled objects often makes sense +anyways). The pickle itself contains a list of lists as follows: + +``` +[ + ['category', 'sms'], + ['ham', 'Go until jurong point, crazy..'], + ['ham', 'Ok lar... Joking wif u oni...'], + ['spam', 'Free entry in 2 a wkly comp to win...'], + ['ham', 'U dun say so early hor... U c already then say...'], + ['ham', 'Nah I don't think he goes to usf, he lives around here though'], + ['spam', 'FreeMsg Hey there darling it's been 3 week's now...'], + ... +] +``` + +where the first column categorizes and sms as "ham" or "spam". The second +column contains the sms itself. + +Data set taken from: +https://www.kaggle.com/uciml/sms-spam-collection-dataset/downloads/sms-spam-collection-dataset.zip + +The transformation (1) reads the pickled data, (2) marks and shortens +messages categorized as spam, and (3) prints the output. + +''' + import bonobo +from bonobo.commands.run import get_default_services from fs.tarfs import TarFS -import os -def cleanse_sms(row): - +def cleanse_sms(**row): if row['category'] == 'spam': row['sms_clean'] = '**MARKED AS SPAM** ' + row['sms'][0:50] + ( '...' if len(row['sms']) > 50 else '' @@ -16,46 +44,21 @@ def cleanse_sms(row): graph = bonobo.Graph( - bonobo.PickleReader('spam.pkl' - ), # spam.pkl is within the gzipped tarball + # spam.pkl is within the gzipped tarball + bonobo.PickleReader('spam.pkl'), cleanse_sms, - print + bonobo.PrettyPrinter(), ) -if __name__ == '__main__': - ''' - This example shows how a different file system service can be injected - into a transformation (as compressing pickled objects often makes sense - anyways). The pickle itself contains a list of lists as follows: - ``` - [ - ['category', 'sms'], - ['ham', 'Go until jurong point, crazy..'], - ['ham', 'Ok lar... Joking wif u oni...'], - ['spam', 'Free entry in 2 a wkly comp to win...'], - ['ham', 'U dun say so early hor... U c already then say...'], - ['ham', 'Nah I don't think he goes to usf, he lives around here though'], - ['spam', 'FreeMsg Hey there darling it's been 3 week's now...'], - ... - ] - ``` - - where the first column categorizes and sms as "ham" or "spam". The second - column contains the sms itself. - - Data set taken from: - https://www.kaggle.com/uciml/sms-spam-collection-dataset/downloads/sms-spam-collection-dataset.zip - - The transformation (1) reads the pickled data, (2) marks and shortens - messages categorized as spam, and (3) prints the output. - ''' - - services = { +def get_services(): + return { 'fs': - TarFS( - os.path. - join(bonobo.get_examples_path(), 'datasets', 'spam.tgz') - ) + TarFS( + bonobo.get_examples_path('datasets/spam.tgz') + ) } - bonobo.run(graph, services=services) + + +if __name__ == '__main__': + bonobo.run(graph, services=get_default_services(__file__)) diff --git a/bonobo/nodes/io/csv.py b/bonobo/nodes/io/csv.py index e0412fa..bf3872d 100644 --- a/bonobo/nodes/io/csv.py +++ b/bonobo/nodes/io/csv.py @@ -3,10 +3,8 @@ import csv from bonobo.config import Option from bonobo.config.processors import ContextProcessor from bonobo.constants import NOT_MODIFIED -from bonobo.errors import ConfigurationError, ValidationError -from bonobo.structs import Bag +from bonobo.nodes.io.file import FileHandler, FileReader, FileWriter from bonobo.util.objects import ValueHolder -from .file import FileHandler, FileReader, FileWriter class CsvHandler(FileHandler): @@ -30,14 +28,6 @@ class CsvHandler(FileHandler): headers = Option(tuple) -def validate_csv_output_format(v): - if callable(v): - return v - if v in {'dict', 'kwargs'}: - return v - raise ValidationError('Unsupported format {!r}.'.format(v)) - - class CsvReader(CsvHandler, FileReader): """ Reads a CSV and yield the values as dicts. @@ -49,26 +39,17 @@ class CsvReader(CsvHandler, FileReader): """ skip = Option(int, default=0) - output_format = Option(validate_csv_output_format, default='dict') @ContextProcessor def csv_headers(self, context, fs, file): yield ValueHolder(self.headers) - def get_output_formater(self): - if callable(self.output_format): - return self.output_format - elif isinstance(self.output_format, str): - return getattr(self, '_format_as_' + self.output_format) - else: - raise ConfigurationError('Unsupported format {!r} for {}.'.format(self.output_format, type(self).__name__)) - def read(self, fs, file, headers): reader = csv.reader(file, delimiter=self.delimiter, quotechar=self.quotechar) - formater = self.get_output_formater() if not headers.get(): headers.set(next(reader)) + _headers = headers.get() field_count = len(headers) @@ -78,15 +59,9 @@ class CsvReader(CsvHandler, FileReader): for row in reader: if len(row) != field_count: - raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count, )) + raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count,)) - yield formater(headers.get(), row) - - def _format_as_dict(self, headers, values): - return dict(zip(headers, values)) - - def _format_as_kwargs(self, headers, values): - return Bag(**dict(zip(headers, values))) + yield self.get_output(dict(zip(_headers, row))) class CsvWriter(CsvHandler, FileWriter): @@ -96,7 +71,8 @@ class CsvWriter(CsvHandler, FileWriter): headers = ValueHolder(list(self.headers) if self.headers else None) yield writer, headers - def write(self, fs, file, lineno, writer, headers, row): + def write(self, fs, file, lineno, writer, headers, *args, **kwargs): + row = self.get_input(*args, **kwargs) if not lineno: headers.set(headers.value or row.keys()) writer.writerow(headers.get()) diff --git a/bonobo/nodes/io/file.py b/bonobo/nodes/io/file.py index b06fae3..3e2c51d 100644 --- a/bonobo/nodes/io/file.py +++ b/bonobo/nodes/io/file.py @@ -1,7 +1,9 @@ +from bonobo import settings from bonobo.config import Option, Service from bonobo.config.configurables import Configurable from bonobo.config.processors import ContextProcessor from bonobo.constants import NOT_MODIFIED +from bonobo.structs.bags import Bag from bonobo.util.objects import ValueHolder @@ -22,6 +24,8 @@ class FileHandler(Configurable): fs = Service('fs') # type: str + ioformat = Option(settings.validate_io_format, default=settings.IOFORMAT) + @ContextProcessor def file(self, context, fs): with self.open(fs) as file: @@ -30,15 +34,35 @@ class FileHandler(Configurable): def open(self, fs): return fs.open(self.path, self.mode, encoding=self.encoding) + def get_input(self, *args, **kwargs): + if self.ioformat == settings.IOFORMAT_ARG0: + assert len(args) == 1 and not len(kwargs), 'ARG0 format implies one arg and no kwargs.' + return args[0] + + if self.ioformat == settings.IOFORMAT_KWARGS: + assert len(args) == 0 and len(kwargs), 'KWARGS format implies no arg.' + return kwargs + + raise NotImplementedError('Unsupported format.') + + def get_output(self, row): + if self.ioformat == settings.IOFORMAT_ARG0: + return row + + if self.ioformat == settings.IOFORMAT_KWARGS: + return Bag(**row) + + raise NotImplementedError('Unsupported format.') + class Reader(FileHandler): """Abstract component factory for readers. """ - def __call__(self, *args): - yield from self.read(*args) + def __call__(self, *args, **kwargs): + yield from self.read(*args, **kwargs) - def read(self, *args): + def read(self, *args, **kwargs): raise NotImplementedError('Abstract.') @@ -46,10 +70,10 @@ class Writer(FileHandler): """Abstract component factory for writers. """ - def __call__(self, *args): + def __call__(self, *args, **kwargs): return self.write(*args) - def write(self, *args): + def write(self, *args, **kwargs): raise NotImplementedError('Abstract.') diff --git a/bonobo/nodes/io/json.py b/bonobo/nodes/io/json.py index b2db708..74857db 100644 --- a/bonobo/nodes/io/json.py +++ b/bonobo/nodes/io/json.py @@ -14,7 +14,7 @@ class JsonReader(JsonHandler, FileReader): def read(self, fs, file): for line in self.loader(file): - yield line + yield self.get_output(line) class JsonWriter(JsonHandler, FileWriter): diff --git a/bonobo/nodes/io/pickle.py b/bonobo/nodes/io/pickle.py index cf6b5eb..c603e91 100644 --- a/bonobo/nodes/io/pickle.py +++ b/bonobo/nodes/io/pickle.py @@ -53,7 +53,7 @@ class PickleReader(PickleHandler, FileReader): if len(i) != item_count: raise ValueError('Received an object with %d items, expecting %d.' % (len(i), item_count, )) - yield dict(zip(i)) if is_dict else dict(zip(pickle_headers.value, i)) + yield self.get_output(dict(zip(i)) if is_dict else dict(zip(pickle_headers.value, i))) class PickleWriter(PickleHandler, FileWriter): diff --git a/bonobo/settings.py b/bonobo/settings.py index dda7ba7..9481bb2 100644 --- a/bonobo/settings.py +++ b/bonobo/settings.py @@ -2,6 +2,8 @@ import os import logging +from bonobo.errors import ValidationError + def to_bool(s): if len(s): @@ -23,7 +25,30 @@ QUIET = to_bool(os.environ.get('QUIET', 'f')) # Logging level. LOGGING_LEVEL = logging.DEBUG if DEBUG else logging.INFO +# Input/Output format for transformations +IOFORMAT_ARG0 = 'arg0' +IOFORMAT_KWARGS = 'kwargs' + +IOFORMATS = { + IOFORMAT_ARG0, + IOFORMAT_KWARGS, +} + +IOFORMAT = os.environ.get('IOFORMAT', IOFORMAT_KWARGS) + + +def validate_io_format(v): + if callable(v): + return v + if v in IOFORMATS: + return v + raise ValidationError('Unsupported format {!r}.'.format(v)) + def check(): if DEBUG and QUIET: raise RuntimeError('I cannot be verbose and quiet at the same time.') + + if IOFORMAT not in IOFORMATS: + raise RuntimeError('Invalid default input/output format.') +