Simplification of node execution context, handle_result is now in step() as it is the only logical place where this will actually be called.
This commit is contained in:
@ -1,5 +1,6 @@
|
|||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
|
import warnings
|
||||||
from queue import Empty
|
from queue import Empty
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from types import GeneratorType
|
from types import GeneratorType
|
||||||
@ -27,7 +28,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
|
|||||||
|
|
||||||
def __init__(self, wrapped, parent=None, services=None, _input=None, _outputs=None):
|
def __init__(self, wrapped, parent=None, services=None, _input=None, _outputs=None):
|
||||||
LoopingExecutionContext.__init__(self, wrapped, parent=parent, services=services)
|
LoopingExecutionContext.__init__(self, wrapped, parent=parent, services=services)
|
||||||
WithStatistics.__init__(self, 'in', 'out', 'err')
|
WithStatistics.__init__(self, 'in', 'out', 'err', 'warn')
|
||||||
|
|
||||||
self.input = _input or Input()
|
self.input = _input or Input()
|
||||||
self.outputs = _outputs or []
|
self.outputs = _outputs or []
|
||||||
@ -125,19 +126,8 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
|
|||||||
|
|
||||||
input_bag = self.get()
|
input_bag = self.get()
|
||||||
|
|
||||||
# todo add timer
|
results = input_bag.apply(self._stack)
|
||||||
self.handle_results(input_bag, input_bag.apply(self._stack))
|
|
||||||
|
|
||||||
def kill(self):
|
|
||||||
if not self.started:
|
|
||||||
raise RuntimeError('Cannot kill a node context that has not started yet.')
|
|
||||||
|
|
||||||
if self.stopped:
|
|
||||||
raise RuntimeError('Cannot kill a node context that has already stopped.')
|
|
||||||
|
|
||||||
self._killed = True
|
|
||||||
|
|
||||||
def handle_results(self, input_bag, results):
|
|
||||||
# self._exec_time += timer.duration
|
# self._exec_time += timer.duration
|
||||||
# Put data onto output channels
|
# Put data onto output channels
|
||||||
|
|
||||||
@ -159,6 +149,15 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
|
|||||||
# self._exec_count += 1
|
# self._exec_count += 1
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def kill(self):
|
||||||
|
if not self.started:
|
||||||
|
raise RuntimeError('Cannot kill a node context that has not started yet.')
|
||||||
|
|
||||||
|
if self.stopped:
|
||||||
|
raise RuntimeError('Cannot kill a node context that has already stopped.')
|
||||||
|
|
||||||
|
self._killed = True
|
||||||
|
|
||||||
def as_dict(self):
|
def as_dict(self):
|
||||||
return {
|
return {
|
||||||
'status': self.status,
|
'status': self.status,
|
||||||
@ -169,7 +168,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
|
|||||||
|
|
||||||
|
|
||||||
def isflag(param):
|
def isflag(param):
|
||||||
return isinstance(param, Token) and param in (NOT_MODIFIED, )
|
return isinstance(param, Token) and param in (NOT_MODIFIED,)
|
||||||
|
|
||||||
|
|
||||||
def split_tokens(output):
|
def split_tokens(output):
|
||||||
@ -181,11 +180,11 @@ def split_tokens(output):
|
|||||||
"""
|
"""
|
||||||
if isinstance(output, Token):
|
if isinstance(output, Token):
|
||||||
# just a flag
|
# just a flag
|
||||||
return (output, ), ()
|
return (output,), ()
|
||||||
|
|
||||||
if not istuple(output):
|
if not istuple(output):
|
||||||
# no flag
|
# no flag
|
||||||
return (), (output, )
|
return (), (output,)
|
||||||
|
|
||||||
i = 0
|
i = 0
|
||||||
while isflag(output[i]):
|
while isflag(output[i]):
|
||||||
|
|||||||
@ -61,12 +61,9 @@ class CsvReader(FileReader, CsvHandler):
|
|||||||
for _ in range(0, self.skip):
|
for _ in range(0, self.skip):
|
||||||
next(reader)
|
next(reader)
|
||||||
|
|
||||||
for row in reader:
|
for lineno, row in enumerate(reader):
|
||||||
if len(row) != field_count:
|
if len(row) != field_count:
|
||||||
warnings.warn('Got a line with %d fields, expecting %d.' % (
|
warnings.warn('Got %d fields on line #%d, expecting %d.' % (len(row), lineno, field_count,))
|
||||||
len(row),
|
|
||||||
field_count,
|
|
||||||
))
|
|
||||||
|
|
||||||
yield dict(zip(_headers, row))
|
yield dict(zip(_headers, row))
|
||||||
|
|
||||||
|
|||||||
@ -28,8 +28,8 @@ class WithStatistics:
|
|||||||
stats = tuple('{0}={1}'.format(name, cnt) for name, cnt in self.get_statistics(*args, **kwargs) if cnt > 0)
|
stats = tuple('{0}={1}'.format(name, cnt) for name, cnt in self.get_statistics(*args, **kwargs) if cnt > 0)
|
||||||
return (kwargs.get('prefix', '') + ' '.join(stats)) if len(stats) else ''
|
return (kwargs.get('prefix', '') + ' '.join(stats)) if len(stats) else ''
|
||||||
|
|
||||||
def increment(self, name):
|
def increment(self, name, *, amount=1):
|
||||||
self.statistics[name] += 1
|
self.statistics[name] += amount
|
||||||
|
|
||||||
|
|
||||||
class Timer:
|
class Timer:
|
||||||
|
|||||||
Reference in New Issue
Block a user