diff --git a/bonobo/core/contexts.py b/bonobo/core/contexts.py index 0c0820a..1853c68 100644 --- a/bonobo/core/contexts.py +++ b/bonobo/core/contexts.py @@ -35,6 +35,12 @@ class ExecutionContext: def __iter__(self): yield from self.components + def impulse(self): + for i in self.graph.outputs_of(Begin): + self[i].recv(Begin) + self[i].recv(Bag()) + self[i].recv(End) + @property def running(self): return any(component.running for component in self.components) @@ -77,6 +83,23 @@ def _iter(x): return iter(x) +def _resolve(input_bag, output): + # NotModified means to send the input unmodified to output. + if output is NotModified: + return input_bag + + # If it does not look like a bag, let's create one for easier manipulation + if hasattr(output, 'apply'): + # Already a bag? Check if we need to set parent. + if InheritInputFlag in output.flags: + output.set_parent(input_bag) + else: + # Not a bag? Let's encapsulate it. + output = Bag(output) + + return output + + class ComponentExecutionContext(WithStatistics): """ todo: make the counter dependant of parent context? @@ -149,34 +172,15 @@ class ComponentExecutionContext(WithStatistics): output channel.""" input_bag = self.get() - - def _resolve(output): - nonlocal input_bag - - # NotModified means to send the input unmodified to output. - if output is NotModified: - return input_bag - - # If it does not look like a bag, let's create one for easier manipulation - if hasattr(output, 'apply'): - # Already a bag? Check if we need to set parent. - if InheritInputFlag in output.flags: - output.set_parent(input_bag) - else: - # Not a bag? Let's encapsulate it. - output = Bag(result) - - return output - - results = self._call(input_bag) + outputs = self._call(input_bag) # self._exec_time += timer.duration # Put data onto output channels try: - results = _iter(results) + outputs = _iter(outputs) except TypeError: - if results: - self.send(_resolve(results)) + if outputs: + self.send(_resolve(input_bag, outputs)) else: # case with no result, an execution went through anyway, use for stats. # self._exec_count += 1 @@ -184,10 +188,10 @@ class ComponentExecutionContext(WithStatistics): else: while True: try: - result = next(results) + output = next(outputs) except StopIteration as e: break - self.send(_resolve(result)) + self.send(_resolve(input_bag, output)) def run(self): assert self.state is New, ('A {} can only be run once, and thus is expected to be in {} state at the ' diff --git a/bonobo/core/strategies/executor.py b/bonobo/core/strategies/executor.py index 6c855eb..de4a99b 100644 --- a/bonobo/core/strategies/executor.py +++ b/bonobo/core/strategies/executor.py @@ -18,12 +18,9 @@ class ExecutorStrategy(Strategy): def execute(self, graph, *args, plugins=None, **kwargs): context = self.create_context(graph, plugins=plugins) - executor = self.executor_factory() + context.impulse() - for i in graph.outputs_of(Begin): - context[i].recv(Begin) - context[i].recv(Bag()) - context[i].recv(End) + executor = self.executor_factory() futures = [] @@ -41,8 +38,7 @@ class ExecutorStrategy(Strategy): executor.shutdown() - #for component_context in context.components: - # print(component_context) + return context class ThreadPoolExecutorStrategy(ExecutorStrategy): diff --git a/bonobo/core/strategies/naive.py b/bonobo/core/strategies/naive.py index 905bb06..fe78271 100644 --- a/bonobo/core/strategies/naive.py +++ b/bonobo/core/strategies/naive.py @@ -1,21 +1,14 @@ -from queue import Queue, Empty - from bonobo.core.strategies.base import Strategy -from bonobo.util.iterators import force_iterator class NaiveStrategy(Strategy): - def execute(self, graph, *args, **kwargs): - context = self.create_context(graph) + def execute(self, graph, *args, plugins=None, **kwargs): + context = self.create_context(graph, plugins=plugins) + context.impulse() - input_queues = {i: Queue() for i in range(len(context.graph.components))} - for i, component in enumerate(context.graph.components): - while True: - try: - args = (input_queues[i].get(block=False), ) if i else () - for row in force_iterator(component(*args)): - input_queues[i + 1].put(row) - if not i: - raise Empty - except Empty: - break + # TODO: how to run plugins in "naive" mode ? + + for component in context.components: + component.run() + + return context diff --git a/tests/core/test_contexts.py b/tests/core/test_contexts.py index e2e6179..30ee083 100644 --- a/tests/core/test_contexts.py +++ b/tests/core/test_contexts.py @@ -1,5 +1,6 @@ -from bonobo import Graph +from bonobo import Graph, NaiveStrategy from bonobo.core.contexts import ExecutionContext +from bonobo.util.lifecycle import with_context def generate_integers(): @@ -10,6 +11,16 @@ def square(i: int) -> int: return i**2 +@with_context +def push_result(ctx, i: int): + if not hasattr(ctx.parent, 'results'): + ctx.parent.results = [] + ctx.parent.results.append(i) + + +chain = (generate_integers, square, push_result) + + def test_empty_execution_context(): graph = Graph() @@ -20,15 +31,29 @@ def test_empty_execution_context(): assert not ctx.running +def test_execution(): + graph = Graph() + graph.add_chain(*chain) + + strategy = NaiveStrategy() + ctx = strategy.execute(graph) + + assert ctx.results == [1, 4, 9, 16, 25, 36, 49, 64, 81] + + def test_simple_execution_context(): graph = Graph() - graph.add_chain(generate_integers, square) + graph.add_chain(*chain) ctx = ExecutionContext(graph) - assert len(ctx.components) == 2 + assert len(ctx.components) == len(chain) assert not len(ctx.plugins) - assert ctx[0].component is generate_integers - assert ctx[1].component is square + for i, component in enumerate(chain): + assert ctx[i].component is component assert not ctx.running + + ctx.impulse() + + assert ctx.running