Merge pull request #300 from KPilnacek/285_update_all_fields_in_a_row
Add MapFields transformation
This commit is contained in:
@ -29,6 +29,7 @@ from bonobo._api import (
|
||||
LdjsonReader,
|
||||
LdjsonWriter,
|
||||
Limit,
|
||||
MapFields,
|
||||
OrderFields,
|
||||
PickleReader,
|
||||
PickleWriter,
|
||||
|
||||
@ -149,6 +149,7 @@ api.register_group(
|
||||
LdjsonReader,
|
||||
LdjsonWriter,
|
||||
Limit,
|
||||
MapFields,
|
||||
OrderFields,
|
||||
PickleReader,
|
||||
PickleWriter,
|
||||
|
||||
@ -48,6 +48,10 @@ class UnrecoverableTypeError(UnrecoverableError, TypeError):
|
||||
pass
|
||||
|
||||
|
||||
class UnrecoverableAttributeError(UnrecoverableError, AttributeError):
|
||||
pass
|
||||
|
||||
|
||||
class UnrecoverableValueError(UnrecoverableError, ValueError):
|
||||
pass
|
||||
|
||||
|
||||
@ -10,6 +10,7 @@ from bonobo.config import Configurable, Method, Option, use_context, use_no_inpu
|
||||
from bonobo.config.functools import transformation_factory
|
||||
from bonobo.config.processors import ContextProcessor, use_context_processor
|
||||
from bonobo.constants import NOT_MODIFIED
|
||||
from bonobo.errors import UnrecoverableAttributeError
|
||||
from bonobo.util.objects import ValueHolder
|
||||
from bonobo.util.term import CLEAR_EOL
|
||||
|
||||
@ -18,6 +19,7 @@ __all__ = [
|
||||
"Format",
|
||||
"Limit",
|
||||
"OrderFields",
|
||||
"MapFields",
|
||||
"PrettyPrinter",
|
||||
"Rename",
|
||||
"SetFields",
|
||||
@ -314,6 +316,46 @@ def Format(**formats):
|
||||
return _Format
|
||||
|
||||
|
||||
@transformation_factory
|
||||
def MapFields(function, key=True):
|
||||
"""
|
||||
Transformation factory that maps `function` on the values of a row.
|
||||
It can be applied either to
|
||||
1. all columns (`key=True`),
|
||||
2. no column (`key=False`), or
|
||||
3. a subset of columns by passing a callable, which takes column name and returns `bool`
|
||||
(same as the parameter `function` in `filter`).
|
||||
|
||||
:param function: callable
|
||||
:param key: bool or callable
|
||||
:return: callable
|
||||
"""
|
||||
@use_raw_input
|
||||
def _MapFields(bag):
|
||||
try:
|
||||
factory = type(bag)._make
|
||||
except AttributeError:
|
||||
factory = type(bag)
|
||||
|
||||
if callable(key):
|
||||
try:
|
||||
fields = bag._fields
|
||||
except AttributeError as e:
|
||||
raise UnrecoverableAttributeError(
|
||||
'This transformation works only on objects with named'
|
||||
' fields (namedtuple, BagType, ...).') from e
|
||||
|
||||
return factory(
|
||||
function(value) if key(key_) else value for key_, value in zip(fields, bag)
|
||||
)
|
||||
elif key:
|
||||
return factory(function(value) for value in bag)
|
||||
else:
|
||||
return NOT_MODIFIED
|
||||
|
||||
return _MapFields
|
||||
|
||||
|
||||
def _count(self, context):
|
||||
counter = yield ValueHolder(0)
|
||||
context.send(counter.get())
|
||||
|
||||
@ -7,6 +7,7 @@ import pytest
|
||||
import bonobo
|
||||
from bonobo.constants import EMPTY, NOT_MODIFIED
|
||||
from bonobo.util import ValueHolder, ensure_tuple
|
||||
from bonobo.util.bags import BagType
|
||||
from bonobo.util.testing import BufferingNodeExecutionContext, ConfigurableNodeTest, StaticNodeTest
|
||||
|
||||
|
||||
@ -113,3 +114,28 @@ def test_methodcaller():
|
||||
with BufferingNodeExecutionContext(methodcaller("zfill", 5)) as context:
|
||||
context.write_sync("a", "bb", "ccc")
|
||||
assert context.get_buffer() == list(map(ensure_tuple, ["0000a", "000bb", "00ccc"]))
|
||||
|
||||
|
||||
MyBag = BagType("MyBag", ["a", "b", "c"])
|
||||
|
||||
|
||||
@pytest.mark.parametrize("input_, key, expected", [
|
||||
(MyBag(1, 2, 3), True, MyBag(1, 4, 9)),
|
||||
(MyBag(1, 2, 3), False, MyBag(1, 2, 3)),
|
||||
(MyBag(1, 2, 3), lambda x: x == 'c', MyBag(1, 2, 9)),
|
||||
((1, 2, 3), True, (1, 4, 9)),
|
||||
((1, 2, 3), False, (1, 2, 3)),
|
||||
])
|
||||
def test_map_fields(input_, key, expected):
|
||||
with BufferingNodeExecutionContext(bonobo.MapFields(lambda x: x**2, key)) as context:
|
||||
context.write_sync(input_)
|
||||
assert context.status == '-'
|
||||
[got] = context.get_buffer()
|
||||
assert expected == got
|
||||
|
||||
|
||||
def test_map_fields_error():
|
||||
with BufferingNodeExecutionContext(bonobo.MapFields(lambda x: x**2, lambda x: x == 'c')) as context:
|
||||
context.write_sync(tuple())
|
||||
assert context.status == '!'
|
||||
assert context.defunct
|
||||
|
||||
Reference in New Issue
Block a user