Files
bonobo/tests/execution/test_node.py

182 lines
4.5 KiB
Python

from bonobo import Bag, Graph
from bonobo.execution.strategies import NaiveStrategy
from bonobo.util.testing import BufferingNodeExecutionContext, BufferingGraphExecutionContext
def test_node_string():
def f():
return 'foo'
with BufferingNodeExecutionContext(f) as context:
context.write_sync(Bag())
output = context.get_buffer()
assert len(output) == 1
assert output[0] == 'foo'
def g():
yield 'foo'
yield 'bar'
with BufferingNodeExecutionContext(g) as context:
context.write_sync(Bag())
output = context.get_buffer()
assert len(output) == 2
assert output[0] == 'foo'
assert output[1] == 'bar'
def test_node_bytes():
def f():
return b'foo'
with BufferingNodeExecutionContext(f) as context:
context.write_sync(Bag())
output = context.get_buffer()
assert len(output) == 1
assert output[0] == b'foo'
def g():
yield b'foo'
yield b'bar'
with BufferingNodeExecutionContext(g) as context:
context.write_sync(Bag())
output = context.get_buffer()
assert len(output) == 2
assert output[0] == b'foo'
assert output[1] == b'bar'
def test_node_dict():
def f():
return {'id': 1, 'name': 'foo'}
with BufferingNodeExecutionContext(f) as context:
context.write_sync(Bag())
output = context.get_buffer()
assert len(output) == 1
assert output[0] == {'id': 1, 'name': 'foo'}
def g():
yield {'id': 1, 'name': 'foo'}
yield {'id': 2, 'name': 'bar'}
with BufferingNodeExecutionContext(g) as context:
context.write_sync(Bag())
output = context.get_buffer()
assert len(output) == 2
assert output[0] == {'id': 1, 'name': 'foo'}
assert output[1] == {'id': 2, 'name': 'bar'}
def test_node_dict_chained():
strategy = NaiveStrategy(GraphExecutionContextType=BufferingGraphExecutionContext)
def uppercase_name(**kwargs):
return {**kwargs, 'name': kwargs['name'].upper()}
def f():
return {'id': 1, 'name': 'foo'}
graph = Graph(f, uppercase_name)
context = strategy.execute(graph)
output = context.get_buffer()
assert len(output) == 1
assert output[0] == {'id': 1, 'name': 'FOO'}
def g():
yield {'id': 1, 'name': 'foo'}
yield {'id': 2, 'name': 'bar'}
graph = Graph(g, uppercase_name)
context = strategy.execute(graph)
output = context.get_buffer()
assert len(output) == 2
assert output[0] == {'id': 1, 'name': 'FOO'}
assert output[1] == {'id': 2, 'name': 'BAR'}
def test_node_tuple():
def f():
return 'foo', 'bar'
with BufferingNodeExecutionContext(f) as context:
context.write_sync(Bag())
output = context.get_buffer()
assert len(output) == 1
assert output[0] == ('foo', 'bar')
def g():
yield 'foo', 'bar'
yield 'foo', 'baz'
with BufferingNodeExecutionContext(g) as context:
context.write_sync(Bag())
output = context.get_buffer()
assert len(output) == 2
assert output[0] == ('foo', 'bar')
assert output[1] == ('foo', 'baz')
def test_node_tuple_chained():
strategy = NaiveStrategy(GraphExecutionContextType=BufferingGraphExecutionContext)
def uppercase(*args):
return tuple(map(str.upper, args))
def f():
return 'foo', 'bar'
graph = Graph(f, uppercase)
context = strategy.execute(graph)
output = context.get_buffer()
assert len(output) == 1
assert output[0] == ('FOO', 'BAR')
def g():
yield 'foo', 'bar'
yield 'foo', 'baz'
graph = Graph(g, uppercase)
context = strategy.execute(graph)
output = context.get_buffer()
assert len(output) == 2
assert output[0] == ('FOO', 'BAR')
assert output[1] == ('FOO', 'BAZ')
def test_node_tuple_dict():
def f():
return 'foo', 'bar', {'id': 1}
with BufferingNodeExecutionContext(f) as context:
context.write_sync(Bag())
output = context.get_buffer()
assert len(output) == 1
assert output[0] == ('foo', 'bar', {'id': 1})
def g():
yield 'foo', 'bar', {'id': 1}
yield 'foo', 'baz', {'id': 2}
with BufferingNodeExecutionContext(g) as context:
context.write_sync(Bag())
output = context.get_buffer()
assert len(output) == 2
assert output[0] == ('foo', 'bar', {'id': 1})
assert output[1] == ('foo', 'baz', {'id': 2})