Add MapFields transformation
This commit is contained in:
@ -29,6 +29,7 @@ from bonobo._api import (
|
|||||||
LdjsonReader,
|
LdjsonReader,
|
||||||
LdjsonWriter,
|
LdjsonWriter,
|
||||||
Limit,
|
Limit,
|
||||||
|
MapFields,
|
||||||
OrderFields,
|
OrderFields,
|
||||||
PickleReader,
|
PickleReader,
|
||||||
PickleWriter,
|
PickleWriter,
|
||||||
|
|||||||
@ -149,6 +149,7 @@ api.register_group(
|
|||||||
LdjsonReader,
|
LdjsonReader,
|
||||||
LdjsonWriter,
|
LdjsonWriter,
|
||||||
Limit,
|
Limit,
|
||||||
|
MapFields,
|
||||||
OrderFields,
|
OrderFields,
|
||||||
PickleReader,
|
PickleReader,
|
||||||
PickleWriter,
|
PickleWriter,
|
||||||
|
|||||||
@ -48,6 +48,10 @@ class UnrecoverableTypeError(UnrecoverableError, TypeError):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class UnrecoverableAttributeError(UnrecoverableError, AttributeError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class UnrecoverableValueError(UnrecoverableError, ValueError):
|
class UnrecoverableValueError(UnrecoverableError, ValueError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|||||||
@ -10,6 +10,7 @@ from bonobo.config import Configurable, Method, Option, use_context, use_no_inpu
|
|||||||
from bonobo.config.functools import transformation_factory
|
from bonobo.config.functools import transformation_factory
|
||||||
from bonobo.config.processors import ContextProcessor, use_context_processor
|
from bonobo.config.processors import ContextProcessor, use_context_processor
|
||||||
from bonobo.constants import NOT_MODIFIED
|
from bonobo.constants import NOT_MODIFIED
|
||||||
|
from bonobo.errors import UnrecoverableAttributeError
|
||||||
from bonobo.util.objects import ValueHolder
|
from bonobo.util.objects import ValueHolder
|
||||||
from bonobo.util.term import CLEAR_EOL
|
from bonobo.util.term import CLEAR_EOL
|
||||||
|
|
||||||
@ -18,6 +19,7 @@ __all__ = [
|
|||||||
"Format",
|
"Format",
|
||||||
"Limit",
|
"Limit",
|
||||||
"OrderFields",
|
"OrderFields",
|
||||||
|
"MapFields",
|
||||||
"PrettyPrinter",
|
"PrettyPrinter",
|
||||||
"Rename",
|
"Rename",
|
||||||
"SetFields",
|
"SetFields",
|
||||||
@ -314,6 +316,40 @@ def Format(**formats):
|
|||||||
return _Format
|
return _Format
|
||||||
|
|
||||||
|
|
||||||
|
@transformation_factory
|
||||||
|
def MapFields(function, key=True):
|
||||||
|
"""
|
||||||
|
Transformation factory that maps `function` on the values of a row.
|
||||||
|
It can be applied either to
|
||||||
|
1. all columns (`key=True`),
|
||||||
|
2. no column (`key=False`), or
|
||||||
|
3. a subset of columns by passing a callable, which takes column name and returns `bool`
|
||||||
|
(same as the parameter `function` in `filter`).
|
||||||
|
|
||||||
|
:param function: callable
|
||||||
|
:param key: bool or callable
|
||||||
|
:return: callable
|
||||||
|
"""
|
||||||
|
@use_raw_input
|
||||||
|
def _MapFields(bag):
|
||||||
|
try:
|
||||||
|
factory = type(bag)._make
|
||||||
|
except AttributeError as e:
|
||||||
|
raise UnrecoverableAttributeError('This transformation works only on objects with named'
|
||||||
|
' fields (namedtuple, BagType, ...).') from e
|
||||||
|
|
||||||
|
if callable(key):
|
||||||
|
return factory(
|
||||||
|
function(value) if key(key_) else value for key_, value in zip(bag._fields, bag)
|
||||||
|
)
|
||||||
|
elif key:
|
||||||
|
return factory(function(value) for value in bag)
|
||||||
|
else:
|
||||||
|
return NOT_MODIFIED
|
||||||
|
|
||||||
|
return _MapFields
|
||||||
|
|
||||||
|
|
||||||
def _count(self, context):
|
def _count(self, context):
|
||||||
counter = yield ValueHolder(0)
|
counter = yield ValueHolder(0)
|
||||||
context.send(counter.get())
|
context.send(counter.get())
|
||||||
|
|||||||
@ -7,6 +7,7 @@ import pytest
|
|||||||
import bonobo
|
import bonobo
|
||||||
from bonobo.constants import EMPTY, NOT_MODIFIED
|
from bonobo.constants import EMPTY, NOT_MODIFIED
|
||||||
from bonobo.util import ValueHolder, ensure_tuple
|
from bonobo.util import ValueHolder, ensure_tuple
|
||||||
|
from bonobo.util.bags import BagType
|
||||||
from bonobo.util.testing import BufferingNodeExecutionContext, ConfigurableNodeTest, StaticNodeTest
|
from bonobo.util.testing import BufferingNodeExecutionContext, ConfigurableNodeTest, StaticNodeTest
|
||||||
|
|
||||||
|
|
||||||
@ -113,3 +114,26 @@ def test_methodcaller():
|
|||||||
with BufferingNodeExecutionContext(methodcaller("zfill", 5)) as context:
|
with BufferingNodeExecutionContext(methodcaller("zfill", 5)) as context:
|
||||||
context.write_sync("a", "bb", "ccc")
|
context.write_sync("a", "bb", "ccc")
|
||||||
assert context.get_buffer() == list(map(ensure_tuple, ["0000a", "000bb", "00ccc"]))
|
assert context.get_buffer() == list(map(ensure_tuple, ["0000a", "000bb", "00ccc"]))
|
||||||
|
|
||||||
|
|
||||||
|
MyBag = BagType("MyBag", ["a", "b", "c"])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("input_, key, expected", [
|
||||||
|
(MyBag(1, 2, 3), True, MyBag(1, 4, 9)),
|
||||||
|
(MyBag(1, 2, 3), False, MyBag(1, 2, 3)),
|
||||||
|
(MyBag(1, 2, 3), lambda x: x == 'c', MyBag(1, 2, 9)),
|
||||||
|
])
|
||||||
|
def test_map_fields(input_, key, expected):
|
||||||
|
with BufferingNodeExecutionContext(bonobo.MapFields(lambda x: x**2, key)) as context:
|
||||||
|
context.write_sync(input_)
|
||||||
|
assert context.status == '-'
|
||||||
|
[got] = context.get_buffer()
|
||||||
|
assert expected == got
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_fields_error():
|
||||||
|
with BufferingNodeExecutionContext(bonobo.MapFields(lambda x: x**2)) as context:
|
||||||
|
context.write_sync(tuple())
|
||||||
|
assert context.status == '!'
|
||||||
|
assert context.defunct
|
||||||
|
|||||||
Reference in New Issue
Block a user