minor changes an travis config for 3.6
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@ -31,6 +31,7 @@
|
||||
/doc/_build/
|
||||
/downloads/
|
||||
/eggs/
|
||||
/examples/private
|
||||
/htmlcov/
|
||||
/sdist/
|
||||
celerybeat-schedule
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
language: python
|
||||
python:
|
||||
- 3.5
|
||||
- 3.6
|
||||
- nightly
|
||||
install:
|
||||
- make install-dev
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
import traceback
|
||||
import types
|
||||
from functools import partial
|
||||
from queue import Empty
|
||||
from time import sleep
|
||||
@ -8,7 +7,6 @@ from bonobo.core.errors import InactiveReadableError
|
||||
from bonobo.core.inputs import Input
|
||||
from bonobo.core.stats import WithStatistics
|
||||
from bonobo.util.lifecycle import get_initializer, get_finalizer
|
||||
from bonobo.util.time import Timer
|
||||
from bonobo.util.tokens import BEGIN, END, NEW, RUNNING, TERMINATED
|
||||
|
||||
|
||||
@ -72,6 +70,12 @@ class PluginExecutionContext:
|
||||
self.alive = False
|
||||
|
||||
|
||||
def iterable(x):
|
||||
if isinstance(x, (dict, list, str)):
|
||||
raise TypeError(type(x).__name__)
|
||||
return iter(x)
|
||||
|
||||
|
||||
class ComponentExecutionContext(WithStatistics):
|
||||
"""
|
||||
todo: make the counter dependant of parent context?
|
||||
@ -140,7 +144,7 @@ class ComponentExecutionContext(WithStatistics):
|
||||
return self.component(self, *args)
|
||||
return self.component(*args)
|
||||
|
||||
def step(self, finalize=False):
|
||||
def step(self):
|
||||
# Pull data from the first available input channel.
|
||||
"""Runs a transformation callable with given args/kwargs and flush the result into the right
|
||||
output channel."""
|
||||
@ -152,24 +156,22 @@ class ComponentExecutionContext(WithStatistics):
|
||||
|
||||
# self._exec_time += timer.duration
|
||||
# Put data onto output channels
|
||||
if isinstance(results, types.GeneratorType):
|
||||
try:
|
||||
results = iterable(results)
|
||||
except TypeError:
|
||||
if results:
|
||||
self.send(results)
|
||||
else:
|
||||
# case with no result, an execution went through anyway, use for stats.
|
||||
# self._exec_count += 1
|
||||
pass
|
||||
else:
|
||||
while True:
|
||||
# timer = Timer()
|
||||
# with timer:
|
||||
# todo _next ?
|
||||
try:
|
||||
result = next(results)
|
||||
except StopIteration as e:
|
||||
break
|
||||
# self._exec_time += timer.duration
|
||||
# self._exec_count += 1
|
||||
self.send(result)
|
||||
elif results is not None:
|
||||
# self._exec_count += 1
|
||||
self.send(results)
|
||||
else:
|
||||
pass
|
||||
# self._exec_count += 1
|
||||
|
||||
def run(self):
|
||||
assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at the '
|
||||
|
||||
Reference in New Issue
Block a user