diff --git a/bonobo/execution/node.py b/bonobo/execution/node.py index 0657972..14bea86 100644 --- a/bonobo/execution/node.py +++ b/bonobo/execution/node.py @@ -88,6 +88,12 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): # todo add timer self.handle_results(input_bag, input_bag.apply(self.wrapped, *self._context)) + def push(self, bag): + # MAKE THIS PUBLIC API FOR CONTEXT PROCESSORS !!! + # xxx handle error or send in first call to apply(...)? + # xxx return value ? + bag.apply(self.handle_error) if is_error(bag) else self.send(bag) + def handle_results(self, input_bag, results): # self._exec_time += timer.duration # Put data onto output channels @@ -95,10 +101,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): results = iter_if_not_sequence(results) except TypeError: # not an iterator if results: - if isinstance(results, ErrorBag): - results.apply(self.handle_error) - else: - self.send(_resolve(input_bag, results)) + self.push(_resolve(input_bag, results)) else: # case with no result, an execution went through anyway, use for stats. # self._exec_count += 1 @@ -106,21 +109,23 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): else: while True: # iterator try: - output = next(results) + result = next(results) except StopIteration: break else: - if isinstance(output, ErrorBag): - output.apply(self.handle_error) - else: - self.send(_resolve(input_bag, output)) + self.push(_resolve(input_bag, result)) +def is_error(bag): + return isinstance(bag, ErrorBag) def _resolve(input_bag, output): # NotModified means to send the input unmodified to output. if output is NOT_MODIFIED: return input_bag + if is_error(output): + return output + # If it does not look like a bag, let's create one for easier manipulation if hasattr(output, 'apply'): # Already a bag? Check if we need to set parent.