New bag implementation improves a lot how bonobo works, even if this is highly backward incompatible (sorry, that's needed, and better sooner than later). * New implementation uses the same approach as python's namedtuple, by dynamically creating the python type's code. This has drawbacks, as it feels like not the right way, but also a lot of benefits that cannot be achieved using a regular approach, especially the constructor parameter order, hardcoded. * Memory usage is now much more efficient. The "keys" memory space will be used only once per "io type", being spent in the underlying type definition instead of in the actual instances. * Transformations now needs to use tuples as output, which will be bound to its "output type". The output type can be infered from the tuple length, or explicitely set by the user using either `context.set_output_type(...)` or `context.set_output_fields(...)` (to build a bag type from a list of field names). Jupyter/Graphviz integration is more tight, allowing to easily display graphs in a notebook, or displaying the live transformation status in an html table instead of a simple <div>. For now, context processors were hacked to stay working as before but the current API is not satisfactory, and should be replaced. This new big change being unreasonable without some time to work on it properly, it is postponed for next versions (0.7, 0.8, ...). Maybe the best idea is to have some kind of "local services", that would use the same dependency injection mechanism as the execution-wide services. Services are now passed by keywoerd arguments only, to avoid confusion with data-arguments.
227 lines
5.8 KiB
Python
227 lines
5.8 KiB
Python
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
|
|
from bonobo import Graph
|
|
from bonobo.constants import EMPTY
|
|
from bonobo.execution.contexts.node import NodeExecutionContext
|
|
from bonobo.execution.strategies import NaiveStrategy
|
|
from bonobo.util.testing import BufferingNodeExecutionContext, BufferingGraphExecutionContext
|
|
|
|
|
|
def test_node_string():
|
|
def f():
|
|
return 'foo'
|
|
|
|
with BufferingNodeExecutionContext(f) as context:
|
|
context.write_sync(EMPTY)
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 1
|
|
assert output[0] == ('foo', )
|
|
|
|
def g():
|
|
yield 'foo'
|
|
yield 'bar'
|
|
|
|
with BufferingNodeExecutionContext(g) as context:
|
|
context.write_sync(EMPTY)
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 2
|
|
assert output[0] == ('foo', )
|
|
assert output[1] == ('bar', )
|
|
|
|
|
|
def test_node_bytes():
|
|
def f():
|
|
return b'foo'
|
|
|
|
with BufferingNodeExecutionContext(f) as context:
|
|
context.write_sync(EMPTY)
|
|
|
|
output = context.get_buffer()
|
|
assert len(output) == 1
|
|
assert output[0] == (b'foo', )
|
|
|
|
def g():
|
|
yield b'foo'
|
|
yield b'bar'
|
|
|
|
with BufferingNodeExecutionContext(g) as context:
|
|
context.write_sync(EMPTY)
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 2
|
|
assert output[0] == (b'foo', )
|
|
assert output[1] == (b'bar', )
|
|
|
|
|
|
def test_node_dict():
|
|
def f():
|
|
return {'id': 1, 'name': 'foo'}
|
|
|
|
with BufferingNodeExecutionContext(f) as context:
|
|
context.write_sync(EMPTY)
|
|
output = context.get_buffer()
|
|
assert len(output) == 1
|
|
assert output[0] == ({'id': 1, 'name': 'foo'}, )
|
|
|
|
def g():
|
|
yield {'id': 1, 'name': 'foo'}
|
|
yield {'id': 2, 'name': 'bar'}
|
|
|
|
with BufferingNodeExecutionContext(g) as context:
|
|
context.write_sync(EMPTY)
|
|
output = context.get_buffer()
|
|
assert len(output) == 2
|
|
assert output[0] == ({'id': 1, 'name': 'foo'}, )
|
|
assert output[1] == ({'id': 2, 'name': 'bar'}, )
|
|
|
|
|
|
def test_node_dict_chained():
|
|
strategy = NaiveStrategy(GraphExecutionContextType=BufferingGraphExecutionContext)
|
|
|
|
def f():
|
|
return {'id': 1, 'name': 'foo'}
|
|
|
|
def uppercase_name(values):
|
|
return {**values, 'name': values['name'].upper()}
|
|
|
|
graph = Graph(f, uppercase_name)
|
|
context = strategy.execute(graph)
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 1
|
|
assert output[0] == ({'id': 1, 'name': 'FOO'}, )
|
|
|
|
def g():
|
|
yield {'id': 1, 'name': 'foo'}
|
|
yield {'id': 2, 'name': 'bar'}
|
|
|
|
graph = Graph(g, uppercase_name)
|
|
context = strategy.execute(graph)
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 2
|
|
assert output[0] == ({'id': 1, 'name': 'FOO'}, )
|
|
assert output[1] == ({'id': 2, 'name': 'BAR'}, )
|
|
|
|
|
|
def test_node_tuple():
|
|
def f():
|
|
return 'foo', 'bar'
|
|
|
|
with BufferingNodeExecutionContext(f) as context:
|
|
context.write_sync(EMPTY)
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 1
|
|
assert output[0] == ('foo', 'bar')
|
|
|
|
def g():
|
|
yield 'foo', 'bar'
|
|
yield 'foo', 'baz'
|
|
|
|
with BufferingNodeExecutionContext(g) as context:
|
|
context.write_sync(EMPTY)
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 2
|
|
assert output[0] == ('foo', 'bar')
|
|
assert output[1] == ('foo', 'baz')
|
|
|
|
|
|
def test_node_tuple_chained():
|
|
strategy = NaiveStrategy(GraphExecutionContextType=BufferingGraphExecutionContext)
|
|
|
|
def uppercase(*args):
|
|
return tuple(map(str.upper, args))
|
|
|
|
def f():
|
|
return 'foo', 'bar'
|
|
|
|
graph = Graph(f, uppercase)
|
|
context = strategy.execute(graph)
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 1
|
|
assert output[0] == ('FOO', 'BAR')
|
|
|
|
def g():
|
|
yield 'foo', 'bar'
|
|
yield 'foo', 'baz'
|
|
|
|
graph = Graph(g, uppercase)
|
|
context = strategy.execute(graph)
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 2
|
|
assert output[0] == ('FOO', 'BAR')
|
|
assert output[1] == ('FOO', 'BAZ')
|
|
|
|
|
|
def test_node_tuple_dict():
|
|
def f():
|
|
return 'foo', 'bar', {'id': 1}
|
|
|
|
with BufferingNodeExecutionContext(f) as context:
|
|
context.write_sync(EMPTY)
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 1
|
|
assert output[0] == ('foo', 'bar', {'id': 1})
|
|
|
|
def g():
|
|
yield 'foo', 'bar', {'id': 1}
|
|
yield 'foo', 'baz', {'id': 2}
|
|
|
|
with BufferingNodeExecutionContext(g) as context:
|
|
context.write_sync(EMPTY)
|
|
output = context.get_buffer()
|
|
|
|
assert len(output) == 2
|
|
assert output[0] == ('foo', 'bar', {'id': 1})
|
|
assert output[1] == ('foo', 'baz', {'id': 2})
|
|
|
|
|
|
def test_node_lifecycle_natural():
|
|
func = MagicMock()
|
|
|
|
ctx = NodeExecutionContext(func)
|
|
assert not any((ctx.started, ctx.stopped, ctx.killed, ctx.alive))
|
|
|
|
# cannot stop before start
|
|
with pytest.raises(RuntimeError):
|
|
ctx.stop()
|
|
assert not any((ctx.started, ctx.stopped, ctx.killed, ctx.alive))
|
|
|
|
# turn the key
|
|
ctx.start()
|
|
assert all((ctx.started, ctx.alive)) and not any((ctx.stopped, ctx.killed))
|
|
|
|
ctx.stop()
|
|
assert all((ctx.started, ctx.stopped)) and not any((ctx.alive, ctx.killed))
|
|
|
|
|
|
def test_node_lifecycle_with_kill():
|
|
func = MagicMock()
|
|
|
|
ctx = NodeExecutionContext(func)
|
|
assert not any((ctx.started, ctx.stopped, ctx.killed, ctx.alive))
|
|
|
|
# cannot kill before start
|
|
with pytest.raises(RuntimeError):
|
|
ctx.kill()
|
|
assert not any((ctx.started, ctx.stopped, ctx.killed, ctx.alive))
|
|
|
|
# turn the key
|
|
ctx.start()
|
|
assert all((ctx.started, ctx.alive)) and not any((ctx.stopped, ctx.killed))
|
|
|
|
ctx.kill()
|
|
assert all((ctx.started, ctx.killed, ctx.alive)) and not ctx.stopped
|
|
|
|
ctx.stop()
|
|
assert all((ctx.started, ctx.killed, ctx.stopped)) and not ctx.alive
|