Small refactoring of duplicate code.
This commit is contained in:
@ -88,6 +88,12 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
|
|||||||
# todo add timer
|
# todo add timer
|
||||||
self.handle_results(input_bag, input_bag.apply(self.wrapped, *self._context))
|
self.handle_results(input_bag, input_bag.apply(self.wrapped, *self._context))
|
||||||
|
|
||||||
|
def push(self, bag):
|
||||||
|
# MAKE THIS PUBLIC API FOR CONTEXT PROCESSORS !!!
|
||||||
|
# xxx handle error or send in first call to apply(...)?
|
||||||
|
# xxx return value ?
|
||||||
|
bag.apply(self.handle_error) if is_error(bag) else self.send(bag)
|
||||||
|
|
||||||
def handle_results(self, input_bag, results):
|
def handle_results(self, input_bag, results):
|
||||||
# self._exec_time += timer.duration
|
# self._exec_time += timer.duration
|
||||||
# Put data onto output channels
|
# Put data onto output channels
|
||||||
@ -95,10 +101,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
|
|||||||
results = iter_if_not_sequence(results)
|
results = iter_if_not_sequence(results)
|
||||||
except TypeError: # not an iterator
|
except TypeError: # not an iterator
|
||||||
if results:
|
if results:
|
||||||
if isinstance(results, ErrorBag):
|
self.push(_resolve(input_bag, results))
|
||||||
results.apply(self.handle_error)
|
|
||||||
else:
|
|
||||||
self.send(_resolve(input_bag, results))
|
|
||||||
else:
|
else:
|
||||||
# case with no result, an execution went through anyway, use for stats.
|
# case with no result, an execution went through anyway, use for stats.
|
||||||
# self._exec_count += 1
|
# self._exec_count += 1
|
||||||
@ -106,21 +109,23 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
|
|||||||
else:
|
else:
|
||||||
while True: # iterator
|
while True: # iterator
|
||||||
try:
|
try:
|
||||||
output = next(results)
|
result = next(results)
|
||||||
except StopIteration:
|
except StopIteration:
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
if isinstance(output, ErrorBag):
|
self.push(_resolve(input_bag, result))
|
||||||
output.apply(self.handle_error)
|
|
||||||
else:
|
|
||||||
self.send(_resolve(input_bag, output))
|
|
||||||
|
|
||||||
|
def is_error(bag):
|
||||||
|
return isinstance(bag, ErrorBag)
|
||||||
|
|
||||||
def _resolve(input_bag, output):
|
def _resolve(input_bag, output):
|
||||||
# NotModified means to send the input unmodified to output.
|
# NotModified means to send the input unmodified to output.
|
||||||
if output is NOT_MODIFIED:
|
if output is NOT_MODIFIED:
|
||||||
return input_bag
|
return input_bag
|
||||||
|
|
||||||
|
if is_error(output):
|
||||||
|
return output
|
||||||
|
|
||||||
# If it does not look like a bag, let's create one for easier manipulation
|
# If it does not look like a bag, let's create one for easier manipulation
|
||||||
if hasattr(output, 'apply'):
|
if hasattr(output, 'apply'):
|
||||||
# Already a bag? Check if we need to set parent.
|
# Already a bag? Check if we need to set parent.
|
||||||
|
|||||||
Reference in New Issue
Block a user