diff --git a/bonobo/execution/contexts/base.py b/bonobo/execution/contexts/base.py index 4647d91..b7f07c5 100644 --- a/bonobo/execution/contexts/base.py +++ b/bonobo/execution/contexts/base.py @@ -99,9 +99,6 @@ class Lifecycle: self._stopped = True - if self._stopped: # Stopping twice has no effect - return - def kill(self): if not self.started: raise RuntimeError('Cannot kill an unstarted context.') @@ -136,3 +133,12 @@ class BaseContext(Lifecycle, Wrapper): Lifecycle.__init__(self) Wrapper.__init__(self, wrapped) self.parent = parent + + @property + def xstatus(self): + """ + UNIX-like exit status, only coherent if the context has stopped. + """ + if self._defunct: + return 70 + return 0 diff --git a/bonobo/execution/contexts/graph.py b/bonobo/execution/contexts/graph.py index a6559a3..3c029b5 100644 --- a/bonobo/execution/contexts/graph.py +++ b/bonobo/execution/contexts/graph.py @@ -4,12 +4,17 @@ from time import sleep from bonobo.config import create_container from bonobo.constants import BEGIN, END from bonobo.execution import events +from bonobo.execution.contexts.base import BaseContext from bonobo.execution.contexts.node import NodeExecutionContext from bonobo.execution.contexts.plugin import PluginExecutionContext from whistle import EventDispatcher -class GraphExecutionContext: +class GraphExecutionContext(BaseContext): + """ + Stores the actual state of a graph execution, and manages its lifecycle. + + """ NodeExecutionContextType = NodeExecutionContext PluginExecutionContextType = PluginExecutionContext @@ -17,17 +22,24 @@ class GraphExecutionContext: @property def started(self): + if not len(self.nodes): + return super(GraphExecutionContext, self).started return any(node.started for node in self.nodes) @property def stopped(self): + if not len(self.nodes): + return super(GraphExecutionContext, self).stopped return all(node.started and node.stopped for node in self.nodes) @property def alive(self): + if not len(self.nodes): + return super(GraphExecutionContext, self).alive return any(node.alive for node in self.nodes) - def __init__(self, graph, plugins=None, services=None, dispatcher=None): + def __init__(self, graph, *, plugins=None, services=None, dispatcher=None): + super(GraphExecutionContext, self).__init__(graph) self.dispatcher = dispatcher or EventDispatcher() self.graph = graph self.nodes = [self.create_node_execution_context_for(node) for node in self.graph] @@ -74,6 +86,8 @@ class GraphExecutionContext: self.dispatcher.dispatch(name, events.ExecutionEvent(self)) def start(self, starter=None): + super(GraphExecutionContext, self).start() + self.register_plugins() self.dispatch(events.START) self.tick(pause=False) @@ -89,13 +103,16 @@ class GraphExecutionContext: if pause: sleep(self.TICK_PERIOD) - def kill(self): - self.dispatch(events.KILL) - for node_context in self.nodes: - node_context.kill() - self.tick() + def loop(self): + while self.should_loop: + self.tick() + for node in self.nodes: + if node.should_loop: + node.step() def stop(self, stopper=None): + super(GraphExecutionContext, self).stop() + self.dispatch(events.STOP) for node_context in self.nodes: if stopper is None: @@ -106,6 +123,14 @@ class GraphExecutionContext: self.dispatch(events.STOPPED) self.unregister_plugins() + def kill(self): + super(GraphExecutionContext, self).kill() + + self.dispatch(events.KILL) + for node_context in self.nodes: + node_context.kill() + self.tick() + def register_plugins(self): for plugin_context in self.plugins: plugin_context.register() diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index cf9ec67..07cbf16 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -21,6 +21,15 @@ UnboundArguments = namedtuple('UnboundArguments', ['args', 'kwargs']) class NodeExecutionContext(BaseContext, WithStatistics): + """ + Stores the actual context of a node, within a given graph execution, accessed as `self.parent`. + + A special case exist, mostly for testing purpose, where there is no parent context. + + Can be used as a context manager, also very convenient for testing nodes that requires some external context (like + a service implementation, or a value holder). + + """ def __init__(self, wrapped, *, parent=None, services=None, _input=None, _outputs=None): """ Node execution context has the responsibility fo storing the state of a transformation during its execution. @@ -170,7 +179,7 @@ class NodeExecutionContext(BaseContext, WithStatistics): if self._stack: try: self._stack.teardown() - except: + except Exception as exc: self.fatal(sys.exc_info()) super().stop() diff --git a/tests/execution/contexts/test_execution_contexts_graph.py b/tests/execution/contexts/test_execution_contexts_graph.py new file mode 100644 index 0000000..95b1ea4 --- /dev/null +++ b/tests/execution/contexts/test_execution_contexts_graph.py @@ -0,0 +1,59 @@ +from bonobo import Graph +from bonobo.execution.contexts import GraphExecutionContext + + +def raise_an_error(*args, **kwargs): + raise Exception('Careful, man, there\'s a beverage here!') + + +def raise_an_unrecoverrable_error(*args, **kwargs): + raise Exception('You are entering a world of pain!') + + +def test_lifecycle_of_empty_graph(): + graph = Graph() + with GraphExecutionContext(graph) as context: + assert context.started + assert context.alive + assert not context.stopped + assert context.started + assert not context.alive + assert context.stopped + assert not context.xstatus + + +def test_lifecycle_of_nonempty_graph(): + graph = Graph([1, 2, 3], print) + with GraphExecutionContext(graph) as context: + assert context.started + assert context.alive + assert not context.stopped + assert context.started + assert not context.alive + assert context.stopped + assert not context.xstatus + + +def test_lifecycle_of_graph_with_recoverable_error(): + graph = Graph([1, 2, 3], raise_an_error, print) + with GraphExecutionContext(graph) as context: + assert context.started + assert context.alive + assert not context.stopped + assert context.started + assert not context.alive + assert context.stopped + assert not context.xstatus + + +def test_lifecycle_of_graph_with_unrecoverable_error(): + graph = Graph([1, 2, 3], raise_an_unrecoverrable_error, print) + with GraphExecutionContext(graph) as context: + assert context.started + assert context.alive + assert not context.stopped + context.loop() + assert context.started + assert not context.alive + assert context.stopped + assert not context.xstatus