diff --git a/bonobo/execution/contexts/graph.py b/bonobo/execution/contexts/graph.py index b2362db..2828592 100644 --- a/bonobo/execution/contexts/graph.py +++ b/bonobo/execution/contexts/graph.py @@ -148,3 +148,11 @@ class GraphExecutionContext(BaseContext): def unregister_plugins(self): for plugin_context in self.plugins: plugin_context.unregister() + + + @property + def xstatus(self): + """ + UNIX-like exit status, only coherent if the context has stopped. + """ + return max(node.xstatus for node in self.nodes) diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index 13c1ff7..b5884a7 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -123,8 +123,6 @@ class NodeExecutionContext(BaseContext, WithStatistics): self.step() except InactiveReadableError: break - except Empty: - sleep(TICK_PERIOD) # XXX: How do we determine this constant? logger.debug('Node loop ends for {!r}.'.format(self)) @@ -133,6 +131,8 @@ class NodeExecutionContext(BaseContext, WithStatistics): self._step() except InactiveReadableError: raise + except Empty: + sleep(TICK_PERIOD) # XXX: How do we determine this constant? except ( NotImplementedError, UnrecoverableError, @@ -294,12 +294,18 @@ class NodeExecutionContext(BaseContext, WithStatistics): # Store or check input type if self._input_type is None: self._input_type = type(input_bag) - elif type(input_bag) is not self._input_type: - raise UnrecoverableTypeError( - 'Input type changed between calls to {!r}.\nGot {!r} which is not of type {!r}.'.format( - self.wrapped, input_bag, self._input_type - ) - ) + elif type(input_bag) != self._input_type: + try: + if self._input_type == tuple: + input_bag = self._input_type(input_bag) + else: + input_bag = self._input_type(*input_bag) + except Exception as exc: + raise UnrecoverableTypeError( + 'Input type changed to incompatible type between calls to {!r}.\nGot {!r} which is not of type {!r}.'.format( + self.wrapped, input_bag, self._input_type + ) + ) from exc # Store or check input length, which is a soft fallback in case we're just using tuples if self._input_length is None: diff --git a/bonobo/nodes/aggregation.py b/bonobo/nodes/aggregation.py new file mode 100644 index 0000000..f09cc17 --- /dev/null +++ b/bonobo/nodes/aggregation.py @@ -0,0 +1,16 @@ +from bonobo.config import Configurable, Method, Option, ContextProcessor, use_raw_input +from bonobo.util import ValueHolder + + +class Reduce(Configurable): + function = Method() + initializer = Option(required=False) + + @ContextProcessor + def buffer(self, context): + values = yield ValueHolder(self.initializer() if callable(self.initializer) else self.initializer) + context.send(values.get()) + + @use_raw_input + def __call__(self, values, bag): + values.set(self.function(values.get(), bag))