Merge pull request #220 from hartym/develop

Less strict CSV processing, to allow dirty input.
This commit is contained in:
Romain Dorgueil
2017-11-12 11:19:18 +01:00
committed by GitHub
3 changed files with 21 additions and 24 deletions

View File

@ -1,5 +1,6 @@
import logging import logging
import sys import sys
import warnings
from queue import Empty from queue import Empty
from time import sleep from time import sleep
from types import GeneratorType from types import GeneratorType
@ -27,7 +28,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
def __init__(self, wrapped, parent=None, services=None, _input=None, _outputs=None): def __init__(self, wrapped, parent=None, services=None, _input=None, _outputs=None):
LoopingExecutionContext.__init__(self, wrapped, parent=parent, services=services) LoopingExecutionContext.__init__(self, wrapped, parent=parent, services=services)
WithStatistics.__init__(self, 'in', 'out', 'err') WithStatistics.__init__(self, 'in', 'out', 'err', 'warn')
self.input = _input or Input() self.input = _input or Input()
self.outputs = _outputs or [] self.outputs = _outputs or []
@ -125,19 +126,8 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
input_bag = self.get() input_bag = self.get()
# todo add timer results = input_bag.apply(self._stack)
self.handle_results(input_bag, input_bag.apply(self._stack))
def kill(self):
if not self.started:
raise RuntimeError('Cannot kill a node context that has not started yet.')
if self.stopped:
raise RuntimeError('Cannot kill a node context that has already stopped.')
self._killed = True
def handle_results(self, input_bag, results):
# self._exec_time += timer.duration # self._exec_time += timer.duration
# Put data onto output channels # Put data onto output channels
@ -159,6 +149,15 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
# self._exec_count += 1 # self._exec_count += 1
pass pass
def kill(self):
if not self.started:
raise RuntimeError('Cannot kill a node context that has not started yet.')
if self.stopped:
raise RuntimeError('Cannot kill a node context that has already stopped.')
self._killed = True
def as_dict(self): def as_dict(self):
return { return {
'status': self.status, 'status': self.status,
@ -169,7 +168,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
def isflag(param): def isflag(param):
return isinstance(param, Token) and param in (NOT_MODIFIED, ) return isinstance(param, Token) and param in (NOT_MODIFIED,)
def split_tokens(output): def split_tokens(output):
@ -181,11 +180,11 @@ def split_tokens(output):
""" """
if isinstance(output, Token): if isinstance(output, Token):
# just a flag # just a flag
return (output, ), () return (output,), ()
if not istuple(output): if not istuple(output):
# no flag # no flag
return (), (output, ) return (), (output,)
i = 0 i = 0
while isflag(output[i]): while isflag(output[i]):

View File

@ -1,4 +1,5 @@
import csv import csv
import warnings
from bonobo.config import Option from bonobo.config import Option
from bonobo.config.options import RemovedOption from bonobo.config.options import RemovedOption
@ -60,12 +61,9 @@ class CsvReader(FileReader, CsvHandler):
for _ in range(0, self.skip): for _ in range(0, self.skip):
next(reader) next(reader)
for row in reader: for lineno, row in enumerate(reader):
if len(row) != field_count: if len(row) != field_count:
raise ValueError('Got a line with %d fields, expecting %d.' % ( warnings.warn('Got %d fields on line #%d, expecting %d.' % (len(row), lineno, field_count,))
len(row),
field_count,
))
yield dict(zip(_headers, row)) yield dict(zip(_headers, row))
@ -81,6 +79,6 @@ class CsvWriter(FileWriter, CsvHandler):
if not lineno: if not lineno:
headers.set(headers.value or row.keys()) headers.set(headers.value or row.keys())
writer.writerow(headers.get()) writer.writerow(headers.get())
writer.writerow(row[header] for header in headers.get()) writer.writerow(row.get(header, '') for header in headers.get())
lineno += 1 lineno += 1
return NOT_MODIFIED return NOT_MODIFIED

View File

@ -28,8 +28,8 @@ class WithStatistics:
stats = tuple('{0}={1}'.format(name, cnt) for name, cnt in self.get_statistics(*args, **kwargs) if cnt > 0) stats = tuple('{0}={1}'.format(name, cnt) for name, cnt in self.get_statistics(*args, **kwargs) if cnt > 0)
return (kwargs.get('prefix', '') + ' '.join(stats)) if len(stats) else '' return (kwargs.get('prefix', '') + ' '.join(stats)) if len(stats) else ''
def increment(self, name): def increment(self, name, *, amount=1):
self.statistics[name] += 1 self.statistics[name] += amount
class Timer: class Timer: