[stdlib] Fix I/O related nodes (especially json), there were bad bugs with ioformat.
This commit is contained in:
@ -70,9 +70,9 @@ def set_level(level):
|
||||
def get_logger(name='bonobo'):
|
||||
return logging.getLogger(name)
|
||||
|
||||
|
||||
# Compatibility with python logging
|
||||
getLogger = get_logger
|
||||
|
||||
|
||||
# Setup formating and level.
|
||||
setup(level=settings.LOGGING_LEVEL)
|
||||
|
||||
82
bonobo/nodes/io/base.py
Normal file
82
bonobo/nodes/io/base.py
Normal file
@ -0,0 +1,82 @@
|
||||
from bonobo import settings
|
||||
from bonobo.config import Configurable, ContextProcessor, Option, Service
|
||||
from bonobo.structs.bags import Bag
|
||||
|
||||
|
||||
class IOFormatEnabled(Configurable):
|
||||
ioformat = Option(default=settings.IOFORMAT.get)
|
||||
|
||||
def get_input(self, *args, **kwargs):
|
||||
if self.ioformat == settings.IOFORMAT_ARG0:
|
||||
if len(args) != 1 or len(kwargs):
|
||||
raise ValueError(
|
||||
'Wrong input formating: IOFORMAT=ARG0 implies one arg and no kwargs, got args={!r} and kwargs={!r}.'.
|
||||
format(args, kwargs)
|
||||
)
|
||||
return args[0]
|
||||
|
||||
if self.ioformat == settings.IOFORMAT_KWARGS:
|
||||
if len(args) or not len(kwargs):
|
||||
raise ValueError(
|
||||
'Wrong input formating: IOFORMAT=KWARGS ioformat implies no arg, got args={!r} and kwargs={!r}.'.
|
||||
format(args, kwargs)
|
||||
)
|
||||
return kwargs
|
||||
|
||||
raise NotImplementedError('Unsupported format.')
|
||||
|
||||
def get_output(self, row):
|
||||
if self.ioformat == settings.IOFORMAT_ARG0:
|
||||
return row
|
||||
|
||||
if self.ioformat == settings.IOFORMAT_KWARGS:
|
||||
return Bag(**row)
|
||||
|
||||
raise NotImplementedError('Unsupported format.')
|
||||
|
||||
|
||||
class FileHandler(Configurable):
|
||||
"""Abstract component factory for file-related components.
|
||||
|
||||
Args:
|
||||
path (str): which path to use within the provided filesystem.
|
||||
eol (str): which character to use to separate lines.
|
||||
mode (str): which mode to use when opening the file.
|
||||
fs (str): service name to use for filesystem.
|
||||
"""
|
||||
|
||||
path = Option(str, required=True, positional=True) # type: str
|
||||
eol = Option(str, default='\n') # type: str
|
||||
mode = Option(str) # type: str
|
||||
encoding = Option(str, default='utf-8') # type: str
|
||||
fs = Service('fs') # type: str
|
||||
|
||||
@ContextProcessor
|
||||
def file(self, context, fs):
|
||||
with self.open(fs) as file:
|
||||
yield file
|
||||
|
||||
def open(self, fs):
|
||||
return fs.open(self.path, self.mode, encoding=self.encoding)
|
||||
|
||||
|
||||
class Reader:
|
||||
"""Abstract component factory for readers.
|
||||
"""
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
yield from self.read(*args, **kwargs)
|
||||
|
||||
def read(self, *args, **kwargs):
|
||||
raise NotImplementedError('Abstract.')
|
||||
|
||||
|
||||
class Writer:
|
||||
"""Abstract component factory for writers.
|
||||
"""
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.write(*args, **kwargs)
|
||||
|
||||
def write(self, *args, **kwargs):
|
||||
raise NotImplementedError('Abstract.')
|
||||
@ -3,7 +3,8 @@ import csv
|
||||
from bonobo.config import Option
|
||||
from bonobo.config.processors import ContextProcessor
|
||||
from bonobo.constants import NOT_MODIFIED
|
||||
from bonobo.nodes.io.file import FileHandler, FileReader, FileWriter
|
||||
from bonobo.nodes.io.file import FileReader, FileWriter
|
||||
from bonobo.nodes.io.base import FileHandler, IOFormatEnabled
|
||||
from bonobo.util.objects import ValueHolder
|
||||
|
||||
|
||||
@ -28,7 +29,7 @@ class CsvHandler(FileHandler):
|
||||
headers = Option(tuple)
|
||||
|
||||
|
||||
class CsvReader(CsvHandler, FileReader):
|
||||
class CsvReader(IOFormatEnabled, FileReader, CsvHandler):
|
||||
"""
|
||||
Reads a CSV and yield the values as dicts.
|
||||
|
||||
@ -59,12 +60,12 @@ class CsvReader(CsvHandler, FileReader):
|
||||
|
||||
for row in reader:
|
||||
if len(row) != field_count:
|
||||
raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count,))
|
||||
raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count, ))
|
||||
|
||||
yield self.get_output(dict(zip(_headers, row)))
|
||||
|
||||
|
||||
class CsvWriter(CsvHandler, FileWriter):
|
||||
class CsvWriter(IOFormatEnabled, FileWriter, CsvHandler):
|
||||
@ContextProcessor
|
||||
def writer(self, context, fs, file, lineno):
|
||||
writer = csv.writer(file, delimiter=self.delimiter, quotechar=self.quotechar, lineterminator=self.eol)
|
||||
|
||||
@ -1,81 +1,11 @@
|
||||
from bonobo import settings
|
||||
from bonobo.config import Option, Service
|
||||
from bonobo.config.configurables import Configurable
|
||||
from bonobo.config import Option
|
||||
from bonobo.config.processors import ContextProcessor
|
||||
from bonobo.constants import NOT_MODIFIED
|
||||
from bonobo.structs.bags import Bag
|
||||
from bonobo.nodes.io.base import FileHandler, Reader, Writer
|
||||
from bonobo.util.objects import ValueHolder
|
||||
|
||||
|
||||
class FileHandler(Configurable):
|
||||
"""Abstract component factory for file-related components.
|
||||
|
||||
Args:
|
||||
path (str): which path to use within the provided filesystem.
|
||||
eol (str): which character to use to separate lines.
|
||||
mode (str): which mode to use when opening the file.
|
||||
fs (str): service name to use for filesystem.
|
||||
"""
|
||||
|
||||
path = Option(str, required=True, positional=True) # type: str
|
||||
eol = Option(str, default='\n') # type: str
|
||||
mode = Option(str) # type: str
|
||||
encoding = Option(str, default='utf-8') # type: str
|
||||
fs = Service('fs') # type: str
|
||||
ioformat = Option(default=settings.IOFORMAT.get)
|
||||
|
||||
@ContextProcessor
|
||||
def file(self, context, fs):
|
||||
with self.open(fs) as file:
|
||||
yield file
|
||||
|
||||
def open(self, fs):
|
||||
return fs.open(self.path, self.mode, encoding=self.encoding)
|
||||
|
||||
def get_input(self, *args, **kwargs):
|
||||
if self.ioformat == settings.IOFORMAT_ARG0:
|
||||
assert len(args) == 1 and not len(kwargs), 'ARG0 format implies one arg and no kwargs.'
|
||||
return args[0]
|
||||
|
||||
if self.ioformat == settings.IOFORMAT_KWARGS:
|
||||
assert len(args) == 0 and len(kwargs), 'KWARGS format implies no arg.'
|
||||
return kwargs
|
||||
|
||||
raise NotImplementedError('Unsupported format.')
|
||||
|
||||
def get_output(self, row):
|
||||
if self.ioformat == settings.IOFORMAT_ARG0:
|
||||
return row
|
||||
|
||||
if self.ioformat == settings.IOFORMAT_KWARGS:
|
||||
return Bag(**row)
|
||||
|
||||
raise NotImplementedError('Unsupported format.')
|
||||
|
||||
|
||||
class Reader(FileHandler):
|
||||
"""Abstract component factory for readers.
|
||||
"""
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
yield from self.read(*args, **kwargs)
|
||||
|
||||
def read(self, *args, **kwargs):
|
||||
raise NotImplementedError('Abstract.')
|
||||
|
||||
|
||||
class Writer(FileHandler):
|
||||
"""Abstract component factory for writers.
|
||||
"""
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.write(*args)
|
||||
|
||||
def write(self, *args, **kwargs):
|
||||
raise NotImplementedError('Abstract.')
|
||||
|
||||
|
||||
class FileReader(Reader):
|
||||
class FileReader(Reader, FileHandler):
|
||||
"""Component factory for file-like readers.
|
||||
|
||||
On its own, it can be used to read a file and yield one row per line, trimming the "eol" character at the end if
|
||||
@ -93,7 +23,7 @@ class FileReader(Reader):
|
||||
yield line.rstrip(self.eol)
|
||||
|
||||
|
||||
class FileWriter(Writer):
|
||||
class FileWriter(Writer, FileHandler):
|
||||
"""Component factory for file or file-like writers.
|
||||
|
||||
On its own, it can be used to write in a file one line per row that comes into this component. Extending it is
|
||||
@ -107,11 +37,11 @@ class FileWriter(Writer):
|
||||
lineno = ValueHolder(0)
|
||||
yield lineno
|
||||
|
||||
def write(self, fs, file, lineno, row):
|
||||
def write(self, fs, file, lineno, line):
|
||||
"""
|
||||
Write a row on the next line of opened file in context.
|
||||
"""
|
||||
self._write_line(file, (self.eol if lineno.value else '') + row)
|
||||
self._write_line(file, (self.eol if lineno.value else '') + line)
|
||||
lineno += 1
|
||||
return NOT_MODIFIED
|
||||
|
||||
|
||||
@ -1,15 +1,17 @@
|
||||
import json
|
||||
|
||||
from bonobo.config.processors import ContextProcessor
|
||||
from bonobo.nodes.io.file import FileWriter, FileReader
|
||||
from bonobo.constants import NOT_MODIFIED
|
||||
from bonobo.nodes.io.base import FileHandler, IOFormatEnabled
|
||||
from bonobo.nodes.io.file import FileReader, FileWriter
|
||||
|
||||
|
||||
class JsonHandler():
|
||||
class JsonHandler(FileHandler):
|
||||
eol = ',\n'
|
||||
prefix, suffix = '[', ']'
|
||||
|
||||
|
||||
class JsonReader(JsonHandler, FileReader):
|
||||
class JsonReader(IOFormatEnabled, FileReader, JsonHandler):
|
||||
loader = staticmethod(json.load)
|
||||
|
||||
def read(self, fs, file):
|
||||
@ -17,18 +19,21 @@ class JsonReader(JsonHandler, FileReader):
|
||||
yield self.get_output(line)
|
||||
|
||||
|
||||
class JsonWriter(JsonHandler, FileWriter):
|
||||
class JsonWriter(IOFormatEnabled, FileWriter, JsonHandler):
|
||||
@ContextProcessor
|
||||
def envelope(self, context, fs, file, lineno):
|
||||
file.write(self.prefix)
|
||||
yield
|
||||
file.write(self.suffix)
|
||||
|
||||
def write(self, fs, file, lineno, row):
|
||||
def write(self, fs, file, lineno, *args, **kwargs):
|
||||
"""
|
||||
Write a json row on the next line of file pointed by ctx.file.
|
||||
|
||||
:param ctx:
|
||||
:param row:
|
||||
"""
|
||||
return super().write(fs, file, lineno, json.dumps(row))
|
||||
row = self.get_input(*args, **kwargs)
|
||||
self._write_line(file, (self.eol if lineno.value else '') + json.dumps(row))
|
||||
lineno += 1
|
||||
return NOT_MODIFIED
|
||||
|
||||
@ -1,10 +1,11 @@
|
||||
import pickle
|
||||
|
||||
from bonobo.config.processors import ContextProcessor
|
||||
from bonobo.config import Option
|
||||
from bonobo.config.processors import ContextProcessor
|
||||
from bonobo.constants import NOT_MODIFIED
|
||||
from bonobo.nodes.io.base import FileHandler, IOFormatEnabled
|
||||
from bonobo.nodes.io.file import FileReader, FileWriter
|
||||
from bonobo.util.objects import ValueHolder
|
||||
from .file import FileReader, FileWriter, FileHandler
|
||||
|
||||
|
||||
class PickleHandler(FileHandler):
|
||||
@ -19,7 +20,7 @@ class PickleHandler(FileHandler):
|
||||
item_names = Option(tuple)
|
||||
|
||||
|
||||
class PickleReader(PickleHandler, FileReader):
|
||||
class PickleReader(IOFormatEnabled, FileReader, PickleHandler):
|
||||
"""
|
||||
Reads a Python pickle object and yields the items in dicts.
|
||||
"""
|
||||
@ -56,8 +57,7 @@ class PickleReader(PickleHandler, FileReader):
|
||||
yield self.get_output(dict(zip(i)) if is_dict else dict(zip(pickle_headers.value, i)))
|
||||
|
||||
|
||||
class PickleWriter(PickleHandler, FileWriter):
|
||||
|
||||
class PickleWriter(IOFormatEnabled, FileWriter, PickleHandler):
|
||||
mode = Option(str, default='wb')
|
||||
|
||||
def write(self, fs, file, lineno, item):
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
from contextlib import contextmanager
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from bonobo import open_fs
|
||||
from bonobo.execution.node import NodeExecutionContext
|
||||
|
||||
|
||||
@ -17,3 +18,20 @@ def optional_contextmanager(cm, *, ignore=False):
|
||||
else:
|
||||
with cm:
|
||||
yield
|
||||
|
||||
|
||||
class FilesystemTester:
|
||||
def __init__(self, extension='txt', mode='w'):
|
||||
self.extension = extension
|
||||
self.input_data = ''
|
||||
self.mode = mode
|
||||
|
||||
def get_services_for_reader(self, tmpdir):
|
||||
fs, filename = open_fs(tmpdir), 'input.' + self.extension
|
||||
with fs.open(filename, self.mode) as fp:
|
||||
fp.write(self.input_data)
|
||||
return fs, filename, {'fs': fs}
|
||||
|
||||
def get_services_for_writer(self, tmpdir):
|
||||
fs, filename = open_fs(tmpdir), 'output.' + self.extension
|
||||
return fs, filename, {'fs': fs}
|
||||
|
||||
Reference in New Issue
Block a user