diff --git a/.gitignore b/.gitignore index 42f67b6..a00ffb4 100644 --- a/.gitignore +++ b/.gitignore @@ -31,6 +31,7 @@ /doc/_build/ /downloads/ /eggs/ +/examples/private /htmlcov/ /sdist/ celerybeat-schedule diff --git a/.travis.yml b/.travis.yml index e7ed28c..3909f43 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,7 @@ language: python python: - 3.5 + - 3.6 - nightly install: - make install-dev diff --git a/bonobo/core/contexts.py b/bonobo/core/contexts.py index 5090ab7..398eac3 100644 --- a/bonobo/core/contexts.py +++ b/bonobo/core/contexts.py @@ -1,5 +1,4 @@ import traceback -import types from functools import partial from queue import Empty from time import sleep @@ -8,7 +7,6 @@ from bonobo.core.errors import InactiveReadableError from bonobo.core.inputs import Input from bonobo.core.stats import WithStatistics from bonobo.util.lifecycle import get_initializer, get_finalizer -from bonobo.util.time import Timer from bonobo.util.tokens import BEGIN, END, NEW, RUNNING, TERMINATED @@ -72,6 +70,12 @@ class PluginExecutionContext: self.alive = False +def iterable(x): + if isinstance(x, (dict, list, str)): + raise TypeError(type(x).__name__) + return iter(x) + + class ComponentExecutionContext(WithStatistics): """ todo: make the counter dependant of parent context? @@ -140,7 +144,7 @@ class ComponentExecutionContext(WithStatistics): return self.component(self, *args) return self.component(*args) - def step(self, finalize=False): + def step(self): # Pull data from the first available input channel. """Runs a transformation callable with given args/kwargs and flush the result into the right output channel.""" @@ -152,24 +156,22 @@ class ComponentExecutionContext(WithStatistics): # self._exec_time += timer.duration # Put data onto output channels - if isinstance(results, types.GeneratorType): + try: + results = iterable(results) + except TypeError: + if results: + self.send(results) + else: + # case with no result, an execution went through anyway, use for stats. + # self._exec_count += 1 + pass + else: while True: - # timer = Timer() - # with timer: - # todo _next ? try: result = next(results) except StopIteration as e: break - # self._exec_time += timer.duration - # self._exec_count += 1 self.send(result) - elif results is not None: - # self._exec_count += 1 - self.send(results) - else: - pass - # self._exec_count += 1 def run(self): assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at the '