Uniformisation of API for graph and node contexts (recv -> write), removing publication of LoopbackBag as this is not stable with current BEGIN/END implementation.

This commit is contained in:
Romain Dorgueil
2017-10-02 09:14:10 +02:00
parent d35598fe8c
commit e04c3dd849
9 changed files with 18 additions and 19 deletions

View File

View File

@ -43,7 +43,7 @@ class GraphExecutionContext:
def __iter__(self): def __iter__(self):
yield from self.nodes yield from self.nodes
def recv(self, *messages): def write(self, *messages):
"""Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in """Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in
our graph.""" our graph."""
@ -56,12 +56,12 @@ class GraphExecutionContext:
for node in self.nodes: for node in self.nodes:
node.start() node.start()
def loop(self):
# todo use strategy
for node in self.nodes:
node.loop()
def stop(self): def stop(self):
# todo use strategy # todo use strategy
for node in self.nodes: for node in self.nodes:
node.stop() node.stop()
def loop(self):
# todo use strategy
for node in self.nodes:
node.loop()

View File

@ -2,7 +2,7 @@ import traceback
from queue import Empty from queue import Empty
from time import sleep from time import sleep
from bonobo.constants import INHERIT_INPUT, NOT_MODIFIED, BEGIN, END from bonobo.constants import INHERIT_INPUT, NOT_MODIFIED
from bonobo.errors import InactiveReadableError, UnrecoverableError from bonobo.errors import InactiveReadableError, UnrecoverableError
from bonobo.execution.base import LoopingExecutionContext from bonobo.execution.base import LoopingExecutionContext
from bonobo.structs.bags import Bag from bonobo.structs.bags import Bag

View File

@ -21,7 +21,7 @@ class ExecutorStrategy(Strategy):
def execute(self, graph, *args, plugins=None, services=None, **kwargs): def execute(self, graph, *args, plugins=None, services=None, **kwargs):
context = self.create_graph_execution_context(graph, plugins=plugins, services=services) context = self.create_graph_execution_context(graph, plugins=plugins, services=services)
context.recv(BEGIN, Bag(), END) context.write(BEGIN, Bag(), END)
executor = self.create_executor() executor = self.create_executor()
@ -57,7 +57,7 @@ class ExecutorStrategy(Strategy):
futures.append(executor.submit(_runner)) futures.append(executor.submit(_runner))
while context.alive: while context.alive:
time.sleep(0.2) time.sleep(0.1)
for plugin_context in context.plugins: for plugin_context in context.plugins:
plugin_context.shutdown() plugin_context.shutdown()

View File

@ -6,7 +6,7 @@ from bonobo.structs.bags import Bag
class NaiveStrategy(Strategy): class NaiveStrategy(Strategy):
def execute(self, graph, *args, plugins=None, **kwargs): def execute(self, graph, *args, plugins=None, **kwargs):
context = self.create_graph_execution_context(graph, plugins=plugins) context = self.create_graph_execution_context(graph, plugins=plugins)
context.recv(BEGIN, Bag(), END) context.write(BEGIN, Bag(), END)
# TODO: how to run plugins in "naive" mode ? # TODO: how to run plugins in "naive" mode ?
context.start() context.start()

View File

@ -1,4 +1,4 @@
from bonobo.structs.bags import Bag, ErrorBag, LoopbackBag from bonobo.structs.bags import Bag, ErrorBag
from bonobo.structs.graphs import Graph from bonobo.structs.graphs import Graph
from bonobo.structs.tokens import Token from bonobo.structs.tokens import Token
@ -6,6 +6,5 @@ __all__ = [
'Bag', 'Bag',
'ErrorBag', 'ErrorBag',
'Graph', 'Graph',
'LoopbackBag',
'Token', 'Token',
] ]

View File

@ -45,7 +45,7 @@ class Bag:
def args(self): def args(self):
if self._parent is None: if self._parent is None:
return self._args return self._args
return (*self._parent.args, *self._args,) return (*self._parent.args, *self._args, )
@property @property
def kwargs(self): def kwargs(self):
@ -93,7 +93,7 @@ class Bag:
@classmethod @classmethod
def inherit(cls, *args, **kwargs): def inherit(cls, *args, **kwargs):
return cls(*args, _flags=(INHERIT_INPUT,), **kwargs) return cls(*args, _flags=(INHERIT_INPUT, ), **kwargs)
def __eq__(self, other): def __eq__(self, other):
return isinstance(other, Bag) and other.args == self.args and other.kwargs == self.kwargs return isinstance(other, Bag) and other.args == self.args and other.kwargs == self.kwargs
@ -101,7 +101,7 @@ class Bag:
def __repr__(self): def __repr__(self):
return '<{} ({})>'.format( return '<{} ({})>'.format(
type(self).__name__, ', '. type(self).__name__, ', '.
join(itertools.chain( join(itertools.chain(
map(repr, self.args), map(repr, self.args),
('{}={}'.format(k, repr(v)) for k, v in self.kwargs.items()), ('{}={}'.format(k, repr(v)) for k, v in self.kwargs.items()),
)) ))
@ -109,7 +109,7 @@ class Bag:
class LoopbackBag(Bag): class LoopbackBag(Bag):
default_flags = (LOOPBACK,) default_flags = (LOOPBACK, )
class ErrorBag(Bag): class ErrorBag(Bag):

View File

@ -103,8 +103,8 @@ def test_version(runner, capsys):
def test_run_with_env(runner, capsys): def test_run_with_env(runner, capsys):
runner( runner(
'run', '--quiet', 'run', '--quiet',
get_examples_path('env_vars/get_passed_env.py'), '--env', 'ENV_TEST_NUMBER=123', get_examples_path('env_vars/get_passed_env.py'), '--env', 'ENV_TEST_NUMBER=123', '--env',
'--env', 'ENV_TEST_USER=cwandrews', '--env', "ENV_TEST_STRING='my_test_string'" 'ENV_TEST_USER=cwandrews', '--env', "ENV_TEST_STRING='my_test_string'"
) )
out, err = capsys.readouterr() out, err = capsys.readouterr()
out = out.split('\n') out = out.split('\n')

View File

@ -62,7 +62,7 @@ def test_simple_execution_context():
assert not ctx.started assert not ctx.started
assert not ctx.stopped assert not ctx.stopped
ctx.recv(BEGIN, Bag(), END) ctx.write(BEGIN, Bag(), END)
assert not ctx.alive assert not ctx.alive
assert not ctx.started assert not ctx.started