179 lines
4.5 KiB
Python
179 lines
4.5 KiB
Python
from bonobo import Bag, Graph
|
|
from bonobo.strategies import NaiveStrategy
|
|
from bonobo.util.testing import BufferingNodeExecutionContext, BufferingGraphExecutionContext
|
|
|
|
|
|
def test_node_string():
|
|
def f():
|
|
return 'foo'
|
|
|
|
with BufferingNodeExecutionContext(f) as context:
|
|
context.write_sync(Bag())
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 1
|
|
assert output[0] == 'foo'
|
|
|
|
def g():
|
|
yield 'foo'
|
|
yield 'bar'
|
|
|
|
with BufferingNodeExecutionContext(g) as context:
|
|
context.write_sync(Bag())
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 2
|
|
assert output[0] == 'foo'
|
|
assert output[1] == 'bar'
|
|
|
|
|
|
def test_node_bytes():
|
|
def f():
|
|
return b'foo'
|
|
|
|
with BufferingNodeExecutionContext(f) as context:
|
|
context.write_sync(Bag())
|
|
|
|
output = context.get_buffer()
|
|
assert len(output) == 1
|
|
assert output[0] == b'foo'
|
|
|
|
def g():
|
|
yield b'foo'
|
|
yield b'bar'
|
|
|
|
with BufferingNodeExecutionContext(g) as context:
|
|
context.write_sync(Bag())
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 2
|
|
assert output[0] == b'foo'
|
|
assert output[1] == b'bar'
|
|
|
|
|
|
def test_node_dict():
|
|
def f():
|
|
return {'id': 1, 'name': 'foo'}
|
|
|
|
with BufferingNodeExecutionContext(f) as context:
|
|
context.write_sync(Bag())
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 1
|
|
assert output[0] == {'id': 1, 'name': 'foo'}
|
|
|
|
def g():
|
|
yield {'id': 1, 'name': 'foo'}
|
|
yield {'id': 2, 'name': 'bar'}
|
|
|
|
with BufferingNodeExecutionContext(g) as context:
|
|
context.write_sync(Bag())
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 2
|
|
assert output[0] == {'id': 1, 'name': 'foo'}
|
|
assert output[1] == {'id': 2, 'name': 'bar'}
|
|
|
|
|
|
def test_node_dict_chained():
|
|
strategy = NaiveStrategy(GraphExecutionContextType=BufferingGraphExecutionContext)
|
|
|
|
def uppercase_name(**kwargs):
|
|
return {**kwargs, 'name': kwargs['name'].upper()}
|
|
|
|
def f():
|
|
return {'id': 1, 'name': 'foo'}
|
|
|
|
graph = Graph(f, uppercase_name)
|
|
context = strategy.execute(graph)
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 1
|
|
assert output[0] == {'id': 1, 'name': 'FOO'}
|
|
|
|
def g():
|
|
yield {'id': 1, 'name': 'foo'}
|
|
yield {'id': 2, 'name': 'bar'}
|
|
|
|
graph = Graph(g, uppercase_name)
|
|
context = strategy.execute(graph)
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 2
|
|
assert output[0] == {'id': 1, 'name': 'FOO'}
|
|
assert output[1] == {'id': 2, 'name': 'BAR'}
|
|
|
|
def test_node_tuple():
|
|
def f():
|
|
return 'foo', 'bar'
|
|
|
|
with BufferingNodeExecutionContext(f) as context:
|
|
context.write_sync(Bag())
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 1
|
|
assert output[0] == ('foo', 'bar')
|
|
|
|
def g():
|
|
yield 'foo', 'bar'
|
|
yield 'foo', 'baz'
|
|
|
|
with BufferingNodeExecutionContext(g) as context:
|
|
context.write_sync(Bag())
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 2
|
|
assert output[0] == ('foo', 'bar')
|
|
assert output[1] == ('foo', 'baz')
|
|
|
|
def test_node_tuple_chained():
|
|
strategy = NaiveStrategy(GraphExecutionContextType=BufferingGraphExecutionContext)
|
|
|
|
def uppercase(*args):
|
|
return tuple(map(str.upper, args))
|
|
|
|
def f():
|
|
return 'foo', 'bar'
|
|
|
|
graph = Graph(f, uppercase)
|
|
context = strategy.execute(graph)
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 1
|
|
assert output[0] == ('FOO', 'BAR')
|
|
|
|
def g():
|
|
yield 'foo', 'bar'
|
|
yield 'foo', 'baz'
|
|
|
|
graph = Graph(g, uppercase)
|
|
context = strategy.execute(graph)
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 2
|
|
assert output[0] == ('FOO', 'BAR')
|
|
assert output[1] == ('FOO', 'BAZ')
|
|
|
|
def test_node_tuple_dict():
|
|
def f():
|
|
return 'foo', 'bar', {'id': 1}
|
|
|
|
with BufferingNodeExecutionContext(f) as context:
|
|
context.write_sync(Bag())
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 1
|
|
assert output[0] == ('foo', 'bar', {'id': 1})
|
|
|
|
def g():
|
|
yield 'foo', 'bar', {'id': 1}
|
|
yield 'foo', 'baz', {'id': 2}
|
|
|
|
with BufferingNodeExecutionContext(g) as context:
|
|
context.write_sync(Bag())
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 2
|
|
assert output[0] == ('foo', 'bar', {'id': 1})
|
|
assert output[1] == ('foo', 'baz', {'id': 2})
|