Files
bonobo/tests/nodes/test_basics.py
2019-05-08 12:01:59 +02:00

145 lines
4.3 KiB
Python

from operator import methodcaller
from unittest import TestCase
from unittest.mock import MagicMock
import pytest
import bonobo
from bonobo.constants import EMPTY, NOT_MODIFIED
from bonobo.util import ValueHolder, ensure_tuple
from bonobo.util.bags import BagType
from bonobo.util.testing import BufferingNodeExecutionContext, ConfigurableNodeTest, StaticNodeTest
class CountTest(StaticNodeTest, TestCase):
node = bonobo.count
def test_counter_required(self):
with pytest.raises(TypeError):
self.call()
def test_manual_call(self):
counter = ValueHolder(0)
for i in range(3):
self.call(counter)
assert counter == 3
def test_execution(self):
with self.execute() as context:
context.write_sync(*([EMPTY] * 42))
assert context.get_buffer() == [(42,)]
class IdentityTest(StaticNodeTest, TestCase):
node = bonobo.identity
def test_basic_call(self):
assert self.call(42) == 42
def test_execution(self):
object_list = [object() for _ in range(42)]
with self.execute() as context:
context.write_sync(*object_list)
assert context.get_buffer() == list(map(ensure_tuple, object_list))
class LimitTest(ConfigurableNodeTest, TestCase):
@classmethod
def setUpClass(cls):
cls.NodeType = bonobo.Limit
def test_execution_default(self):
object_list = [object() for _ in range(42)]
with self.execute() as context:
context.write_sync(*object_list)
assert context.get_buffer() == list(map(ensure_tuple, object_list[:10]))
def test_execution_custom(self):
object_list = [object() for _ in range(42)]
with self.execute(21) as context:
context.write_sync(*object_list)
assert context.get_buffer() == list(map(ensure_tuple, object_list[:21]))
def test_manual(self):
limit = self.NodeType(5)
buffer = []
for x in range(10):
buffer += list(limit(x))
assert len(buffer) == 5
def test_underflow(self):
limit = self.NodeType(10)
buffer = []
for x in range(5):
buffer += list(limit(x))
assert len(buffer) == 5
def test_tee():
inner = MagicMock(side_effect=bonobo.identity)
tee = bonobo.Tee(inner)
results = []
for i in range(10):
results.append(tee("foo"))
assert results == [NOT_MODIFIED] * 10
assert len(inner.mock_calls) == 10
def test_noop():
assert bonobo.noop(1, 2, 3, 4, foo="bar") == NOT_MODIFIED
def test_fixedwindow():
with BufferingNodeExecutionContext(bonobo.FixedWindow(2)) as context:
context.write_sync(*range(10))
assert context.get_buffer() == [(0, 1), (2, 3), (4, 5), (6, 7), (8, 9)]
with BufferingNodeExecutionContext(bonobo.FixedWindow(2)) as context:
context.write_sync(*range(9))
assert context.get_buffer() == [(0, 1), (2, 3), (4, 5), (6, 7), (8, None)]
with BufferingNodeExecutionContext(bonobo.FixedWindow(1)) as context:
context.write_sync(*range(3))
assert context.get_buffer() == [(0,), (1,), (2,)]
def test_methodcaller():
with BufferingNodeExecutionContext(methodcaller("swapcase")) as context:
context.write_sync("aaa", "bBb", "CcC")
assert context.get_buffer() == list(map(ensure_tuple, ["AAA", "BbB", "cCc"]))
with BufferingNodeExecutionContext(methodcaller("zfill", 5)) as context:
context.write_sync("a", "bb", "ccc")
assert context.get_buffer() == list(map(ensure_tuple, ["0000a", "000bb", "00ccc"]))
MyBag = BagType("MyBag", ["a", "b", "c"])
@pytest.mark.parametrize(
"input_, key, expected",
[
(MyBag(1, 2, 3), True, MyBag(1, 4, 9)),
(MyBag(1, 2, 3), False, MyBag(1, 2, 3)),
(MyBag(1, 2, 3), lambda x: x == "c", MyBag(1, 2, 9)),
((1, 2, 3), True, (1, 4, 9)),
((1, 2, 3), False, (1, 2, 3)),
],
)
def test_map_fields(input_, key, expected):
with BufferingNodeExecutionContext(bonobo.MapFields(lambda x: x ** 2, key)) as context:
context.write_sync(input_)
assert context.status == "-"
[got] = context.get_buffer()
assert expected == got
def test_map_fields_error():
with BufferingNodeExecutionContext(bonobo.MapFields(lambda x: x ** 2, lambda x: x == "c")) as context:
context.write_sync(tuple())
assert context.status == "!"
assert context.defunct