From 7b365e014d4941a2133a6690912c7645a030419a Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Mon, 16 Jul 2018 11:56:46 +0200 Subject: [PATCH 1/5] aggregates: Adds first version of Reduce() based on @levic work. --- bonobo/nodes/aggregation.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 bonobo/nodes/aggregation.py diff --git a/bonobo/nodes/aggregation.py b/bonobo/nodes/aggregation.py new file mode 100644 index 0000000..f09cc17 --- /dev/null +++ b/bonobo/nodes/aggregation.py @@ -0,0 +1,16 @@ +from bonobo.config import Configurable, Method, Option, ContextProcessor, use_raw_input +from bonobo.util import ValueHolder + + +class Reduce(Configurable): + function = Method() + initializer = Option(required=False) + + @ContextProcessor + def buffer(self, context): + values = yield ValueHolder(self.initializer() if callable(self.initializer) else self.initializer) + context.send(values.get()) + + @use_raw_input + def __call__(self, values, bag): + values.set(self.function(values.get(), bag)) From 71cd606fadfa6e07e556fc81203adee10005d1f8 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Mon, 16 Jul 2018 12:12:15 +0200 Subject: [PATCH 2/5] experiment: try to autocast when possible --- bonobo/execution/contexts/node.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index 07cbf16..a447261 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -286,11 +286,17 @@ class NodeExecutionContext(BaseContext, WithStatistics): if self._input_type is None: self._input_type = type(input_bag) elif type(input_bag) is not self._input_type: - raise UnrecoverableTypeError( - 'Input type changed between calls to {!r}.\nGot {!r} which is not of type {!r}.'.format( - self.wrapped, input_bag, self._input_type - ) - ) + try: + if type(self._input_type) == tuple: + input_bag = self._input_type(tuple) + else: + input_bag = self._input_type(*input_bag) + except Exception as exc: + raise UnrecoverableTypeError( + 'Input type changed to incompatible type between calls to {!r}.\nGot {!r} which is not of type {!r}.'.format( + self.wrapped, input_bag, self._input_type + ) + ) from exc # Store or check input length, which is a soft fallback in case we're just using tuples if self._input_length is None: From 5780b3648007caf3567d510018286cc8f7d959b8 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Mon, 16 Jul 2018 12:53:15 +0200 Subject: [PATCH 3/5] experiment: try to autocast when possible --- bonobo/execution/contexts/node.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index a447261..e187b89 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -285,10 +285,10 @@ class NodeExecutionContext(BaseContext, WithStatistics): # Store or check input type if self._input_type is None: self._input_type = type(input_bag) - elif type(input_bag) is not self._input_type: + elif type(input_bag) != self._input_type: try: - if type(self._input_type) == tuple: - input_bag = self._input_type(tuple) + if self._input_type == tuple: + input_bag = self._input_type(input_bag) else: input_bag = self._input_type(*input_bag) except Exception as exc: From e0d714cbba2559fe43269a0f36280d982e4f080b Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Sun, 22 Jul 2018 07:58:30 +0200 Subject: [PATCH 4/5] bug: fix bad mistake in moving the "empty" catcher. --- bonobo/execution/contexts/node.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index 784dd8f..4c5cff7 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -123,8 +123,6 @@ class NodeExecutionContext(BaseContext, WithStatistics): self.step() except InactiveReadableError: break - except Empty: - sleep(TICK_PERIOD) # XXX: How do we determine this constant? logger.debug('Node loop ends for {!r}.'.format(self)) @@ -133,6 +131,8 @@ class NodeExecutionContext(BaseContext, WithStatistics): self._step() except InactiveReadableError: raise + except Empty: + sleep(TICK_PERIOD) # XXX: How do we determine this constant? except ( NotImplementedError, UnrecoverableError, From c449f8601e11df2d6c78e72cf8191d433fc185fe Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Sun, 22 Jul 2018 11:20:08 +0200 Subject: [PATCH 5/5] implements xstatus in graph context, based on node xstatus. --- bonobo/execution/contexts/graph.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/bonobo/execution/contexts/graph.py b/bonobo/execution/contexts/graph.py index b2362db..2828592 100644 --- a/bonobo/execution/contexts/graph.py +++ b/bonobo/execution/contexts/graph.py @@ -148,3 +148,11 @@ class GraphExecutionContext(BaseContext): def unregister_plugins(self): for plugin_context in self.plugins: plugin_context.unregister() + + + @property + def xstatus(self): + """ + UNIX-like exit status, only coherent if the context has stopped. + """ + return max(node.xstatus for node in self.nodes)