Files
bonobo/tests/execution/contexts/test_execution_contexts_graph.py

60 lines
1.8 KiB
Python

from bonobo import Graph
from bonobo.constants import BEGIN, EMPTY, END
from bonobo.execution.contexts import GraphExecutionContext
def raise_an_error(*args, **kwargs):
raise Exception("Careful, man, there's a beverage here!")
def raise_an_unrecoverrable_error(*args, **kwargs):
raise Exception("You are entering a world of pain!")
def test_lifecycle_of_empty_graph():
graph = Graph()
with GraphExecutionContext(graph) as context:
assert context.started
assert context.alive
assert not context.stopped
assert context.started
assert not context.alive
assert context.stopped
assert not context.xstatus
def test_lifecycle_of_nonempty_graph():
graph = Graph([1, 2, 3], print)
with GraphExecutionContext(graph) as context:
assert context.started
assert context.alive
assert not context.stopped
assert context.started
assert not context.alive
assert context.stopped
assert not context.xstatus
def test_lifecycle_of_graph_with_recoverable_error():
graph = Graph([1, 2, 3], raise_an_error, print)
with GraphExecutionContext(graph) as context:
assert context.started
assert context.alive
assert not context.stopped
assert context.started
assert not context.alive
assert context.stopped
assert not context.xstatus
def test_lifecycle_of_graph_with_unrecoverable_error():
graph = Graph([1, 2, 3], raise_an_unrecoverrable_error, print)
with GraphExecutionContext(graph) as context:
assert context.started and context.alive and not context.stopped
context.write(BEGIN, EMPTY, END)
context.loop()
assert context.started
assert not context.alive
assert context.stopped
assert not context.xstatus