Files
bonobo/tests/execution/contexts/test_execution_contexts_graph.py
2018-07-11 17:09:53 +02:00

60 lines
1.7 KiB
Python

from bonobo import Graph
from bonobo.execution.contexts import GraphExecutionContext
def raise_an_error(*args, **kwargs):
raise Exception('Careful, man, there\'s a beverage here!')
def raise_an_unrecoverrable_error(*args, **kwargs):
raise Exception('You are entering a world of pain!')
def test_lifecycle_of_empty_graph():
graph = Graph()
with GraphExecutionContext(graph) as context:
assert context.started
assert context.alive
assert not context.stopped
assert context.started
assert not context.alive
assert context.stopped
assert not context.xstatus
def test_lifecycle_of_nonempty_graph():
graph = Graph([1, 2, 3], print)
with GraphExecutionContext(graph) as context:
assert context.started
assert context.alive
assert not context.stopped
assert context.started
assert not context.alive
assert context.stopped
assert not context.xstatus
def test_lifecycle_of_graph_with_recoverable_error():
graph = Graph([1, 2, 3], raise_an_error, print)
with GraphExecutionContext(graph) as context:
assert context.started
assert context.alive
assert not context.stopped
assert context.started
assert not context.alive
assert context.stopped
assert not context.xstatus
def test_lifecycle_of_graph_with_unrecoverable_error():
graph = Graph([1, 2, 3], raise_an_unrecoverrable_error, print)
with GraphExecutionContext(graph) as context:
assert context.started
assert context.alive
assert not context.stopped
context.loop()
assert context.started
assert not context.alive
assert context.stopped
assert not context.xstatus