diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index 767b929..a18f4d7 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -1,5 +1,6 @@ import logging import sys +import warnings from queue import Empty from time import sleep from types import GeneratorType @@ -27,7 +28,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): def __init__(self, wrapped, parent=None, services=None, _input=None, _outputs=None): LoopingExecutionContext.__init__(self, wrapped, parent=parent, services=services) - WithStatistics.__init__(self, 'in', 'out', 'err') + WithStatistics.__init__(self, 'in', 'out', 'err', 'warn') self.input = _input or Input() self.outputs = _outputs or [] @@ -125,19 +126,8 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): input_bag = self.get() - # todo add timer - self.handle_results(input_bag, input_bag.apply(self._stack)) + results = input_bag.apply(self._stack) - def kill(self): - if not self.started: - raise RuntimeError('Cannot kill a node context that has not started yet.') - - if self.stopped: - raise RuntimeError('Cannot kill a node context that has already stopped.') - - self._killed = True - - def handle_results(self, input_bag, results): # self._exec_time += timer.duration # Put data onto output channels @@ -159,6 +149,15 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): # self._exec_count += 1 pass + def kill(self): + if not self.started: + raise RuntimeError('Cannot kill a node context that has not started yet.') + + if self.stopped: + raise RuntimeError('Cannot kill a node context that has already stopped.') + + self._killed = True + def as_dict(self): return { 'status': self.status, @@ -169,7 +168,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): def isflag(param): - return isinstance(param, Token) and param in (NOT_MODIFIED, ) + return isinstance(param, Token) and param in (NOT_MODIFIED,) def split_tokens(output): @@ -181,11 +180,11 @@ def split_tokens(output): """ if isinstance(output, Token): # just a flag - return (output, ), () + return (output,), () if not istuple(output): # no flag - return (), (output, ) + return (), (output,) i = 0 while isflag(output[i]): diff --git a/bonobo/nodes/io/csv.py b/bonobo/nodes/io/csv.py index 6f24df6..c846a5f 100644 --- a/bonobo/nodes/io/csv.py +++ b/bonobo/nodes/io/csv.py @@ -61,12 +61,9 @@ class CsvReader(FileReader, CsvHandler): for _ in range(0, self.skip): next(reader) - for row in reader: + for lineno, row in enumerate(reader): if len(row) != field_count: - warnings.warn('Got a line with %d fields, expecting %d.' % ( - len(row), - field_count, - )) + warnings.warn('Got %d fields on line #%d, expecting %d.' % (len(row), lineno, field_count,)) yield dict(zip(_headers, row)) diff --git a/bonobo/util/statistics.py b/bonobo/util/statistics.py index 2f9c5c2..31da8b9 100644 --- a/bonobo/util/statistics.py +++ b/bonobo/util/statistics.py @@ -28,8 +28,8 @@ class WithStatistics: stats = tuple('{0}={1}'.format(name, cnt) for name, cnt in self.get_statistics(*args, **kwargs) if cnt > 0) return (kwargs.get('prefix', '') + ' '.join(stats)) if len(stats) else '' - def increment(self, name): - self.statistics[name] += 1 + def increment(self, name, *, amount=1): + self.statistics[name] += amount class Timer: