Files
bonobo/tests/nodes/test_basics.py
2018-08-11 06:34:37 +02:00

116 lines
3.5 KiB
Python

from operator import methodcaller
from unittest import TestCase
from unittest.mock import MagicMock
import pytest
import bonobo
from bonobo.constants import EMPTY, NOT_MODIFIED
from bonobo.util import ValueHolder, ensure_tuple
from bonobo.util.testing import BufferingNodeExecutionContext, ConfigurableNodeTest, StaticNodeTest
class CountTest(StaticNodeTest, TestCase):
node = bonobo.count
def test_counter_required(self):
with pytest.raises(TypeError):
self.call()
def test_manual_call(self):
counter = ValueHolder(0)
for i in range(3):
self.call(counter)
assert counter == 3
def test_execution(self):
with self.execute() as context:
context.write_sync(*([EMPTY] * 42))
assert context.get_buffer() == [(42,)]
class IdentityTest(StaticNodeTest, TestCase):
node = bonobo.identity
def test_basic_call(self):
assert self.call(42) == 42
def test_execution(self):
object_list = [object() for _ in range(42)]
with self.execute() as context:
context.write_sync(*object_list)
assert context.get_buffer() == list(map(ensure_tuple, object_list))
class LimitTest(ConfigurableNodeTest, TestCase):
@classmethod
def setUpClass(cls):
cls.NodeType = bonobo.Limit
def test_execution_default(self):
object_list = [object() for _ in range(42)]
with self.execute() as context:
context.write_sync(*object_list)
assert context.get_buffer() == list(map(ensure_tuple, object_list[:10]))
def test_execution_custom(self):
object_list = [object() for _ in range(42)]
with self.execute(21) as context:
context.write_sync(*object_list)
assert context.get_buffer() == list(map(ensure_tuple, object_list[:21]))
def test_manual(self):
limit = self.NodeType(5)
buffer = []
for x in range(10):
buffer += list(limit(x))
assert len(buffer) == 5
def test_underflow(self):
limit = self.NodeType(10)
buffer = []
for x in range(5):
buffer += list(limit(x))
assert len(buffer) == 5
def test_tee():
inner = MagicMock(side_effect=bonobo.identity)
tee = bonobo.Tee(inner)
results = []
for i in range(10):
results.append(tee('foo'))
assert results == [NOT_MODIFIED] * 10
assert len(inner.mock_calls) == 10
def test_noop():
assert bonobo.noop(1, 2, 3, 4, foo='bar') == NOT_MODIFIED
def test_fixedwindow():
with BufferingNodeExecutionContext(bonobo.FixedWindow(2)) as context:
context.write_sync(*range(10))
assert context.get_buffer() == [(0, 1), (2, 3), (4, 5), (6, 7), (8, 9)]
with BufferingNodeExecutionContext(bonobo.FixedWindow(2)) as context:
context.write_sync(*range(9))
assert context.get_buffer() == [(0, 1), (2, 3), (4, 5), (6, 7), (8, None)]
with BufferingNodeExecutionContext(bonobo.FixedWindow(1)) as context:
context.write_sync(*range(3))
assert context.get_buffer() == [(0,), (1,), (2,)]
def test_methodcaller():
with BufferingNodeExecutionContext(methodcaller('swapcase')) as context:
context.write_sync('aaa', 'bBb', 'CcC')
assert context.get_buffer() == list(map(ensure_tuple, ['AAA', 'BbB', 'cCc']))
with BufferingNodeExecutionContext(methodcaller('zfill', 5)) as context:
context.write_sync('a', 'bb', 'ccc')
assert context.get_buffer() == list(map(ensure_tuple, ['0000a', '000bb', '00ccc']))