From e04c3dd84916a0b5cb08da9264104d76a4c5d6f5 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Mon, 2 Oct 2017 09:14:10 +0200 Subject: [PATCH] Uniformisation of API for graph and node contexts (recv -> write), removing publication of LoopbackBag as this is not stable with current BEGIN/END implementation. --- bonobo/config/features.py | 0 bonobo/execution/graph.py | 12 ++++++------ bonobo/execution/node.py | 2 +- bonobo/strategies/executor.py | 4 ++-- bonobo/strategies/naive.py | 2 +- bonobo/structs/__init__.py | 3 +-- bonobo/structs/bags.py | 8 ++++---- tests/test_commands.py | 4 ++-- tests/test_execution.py | 2 +- 9 files changed, 18 insertions(+), 19 deletions(-) create mode 100644 bonobo/config/features.py diff --git a/bonobo/config/features.py b/bonobo/config/features.py new file mode 100644 index 0000000..e69de29 diff --git a/bonobo/execution/graph.py b/bonobo/execution/graph.py index 2c08589..1859adc 100644 --- a/bonobo/execution/graph.py +++ b/bonobo/execution/graph.py @@ -43,7 +43,7 @@ class GraphExecutionContext: def __iter__(self): yield from self.nodes - def recv(self, *messages): + def write(self, *messages): """Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in our graph.""" @@ -56,12 +56,12 @@ class GraphExecutionContext: for node in self.nodes: node.start() - def loop(self): - # todo use strategy - for node in self.nodes: - node.loop() - def stop(self): # todo use strategy for node in self.nodes: node.stop() + + def loop(self): + # todo use strategy + for node in self.nodes: + node.loop() diff --git a/bonobo/execution/node.py b/bonobo/execution/node.py index 6f83f04..e8869ac 100644 --- a/bonobo/execution/node.py +++ b/bonobo/execution/node.py @@ -2,7 +2,7 @@ import traceback from queue import Empty from time import sleep -from bonobo.constants import INHERIT_INPUT, NOT_MODIFIED, BEGIN, END +from bonobo.constants import INHERIT_INPUT, NOT_MODIFIED from bonobo.errors import InactiveReadableError, UnrecoverableError from bonobo.execution.base import LoopingExecutionContext from bonobo.structs.bags import Bag diff --git a/bonobo/strategies/executor.py b/bonobo/strategies/executor.py index 44d206e..a0bd4f4 100644 --- a/bonobo/strategies/executor.py +++ b/bonobo/strategies/executor.py @@ -21,7 +21,7 @@ class ExecutorStrategy(Strategy): def execute(self, graph, *args, plugins=None, services=None, **kwargs): context = self.create_graph_execution_context(graph, plugins=plugins, services=services) - context.recv(BEGIN, Bag(), END) + context.write(BEGIN, Bag(), END) executor = self.create_executor() @@ -57,7 +57,7 @@ class ExecutorStrategy(Strategy): futures.append(executor.submit(_runner)) while context.alive: - time.sleep(0.2) + time.sleep(0.1) for plugin_context in context.plugins: plugin_context.shutdown() diff --git a/bonobo/strategies/naive.py b/bonobo/strategies/naive.py index b93a2e9..cab9c57 100644 --- a/bonobo/strategies/naive.py +++ b/bonobo/strategies/naive.py @@ -6,7 +6,7 @@ from bonobo.structs.bags import Bag class NaiveStrategy(Strategy): def execute(self, graph, *args, plugins=None, **kwargs): context = self.create_graph_execution_context(graph, plugins=plugins) - context.recv(BEGIN, Bag(), END) + context.write(BEGIN, Bag(), END) # TODO: how to run plugins in "naive" mode ? context.start() diff --git a/bonobo/structs/__init__.py b/bonobo/structs/__init__.py index 678cea1..6c0d9ab 100644 --- a/bonobo/structs/__init__.py +++ b/bonobo/structs/__init__.py @@ -1,4 +1,4 @@ -from bonobo.structs.bags import Bag, ErrorBag, LoopbackBag +from bonobo.structs.bags import Bag, ErrorBag from bonobo.structs.graphs import Graph from bonobo.structs.tokens import Token @@ -6,6 +6,5 @@ __all__ = [ 'Bag', 'ErrorBag', 'Graph', - 'LoopbackBag', 'Token', ] diff --git a/bonobo/structs/bags.py b/bonobo/structs/bags.py index 0c91274..3eae9ff 100644 --- a/bonobo/structs/bags.py +++ b/bonobo/structs/bags.py @@ -45,7 +45,7 @@ class Bag: def args(self): if self._parent is None: return self._args - return (*self._parent.args, *self._args,) + return (*self._parent.args, *self._args, ) @property def kwargs(self): @@ -93,7 +93,7 @@ class Bag: @classmethod def inherit(cls, *args, **kwargs): - return cls(*args, _flags=(INHERIT_INPUT,), **kwargs) + return cls(*args, _flags=(INHERIT_INPUT, ), **kwargs) def __eq__(self, other): return isinstance(other, Bag) and other.args == self.args and other.kwargs == self.kwargs @@ -101,7 +101,7 @@ class Bag: def __repr__(self): return '<{} ({})>'.format( type(self).__name__, ', '. - join(itertools.chain( + join(itertools.chain( map(repr, self.args), ('{}={}'.format(k, repr(v)) for k, v in self.kwargs.items()), )) @@ -109,7 +109,7 @@ class Bag: class LoopbackBag(Bag): - default_flags = (LOOPBACK,) + default_flags = (LOOPBACK, ) class ErrorBag(Bag): diff --git a/tests/test_commands.py b/tests/test_commands.py index 3f26e44..1fca75a 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -103,8 +103,8 @@ def test_version(runner, capsys): def test_run_with_env(runner, capsys): runner( 'run', '--quiet', - get_examples_path('env_vars/get_passed_env.py'), '--env', 'ENV_TEST_NUMBER=123', - '--env', 'ENV_TEST_USER=cwandrews', '--env', "ENV_TEST_STRING='my_test_string'" + get_examples_path('env_vars/get_passed_env.py'), '--env', 'ENV_TEST_NUMBER=123', '--env', + 'ENV_TEST_USER=cwandrews', '--env', "ENV_TEST_STRING='my_test_string'" ) out, err = capsys.readouterr() out = out.split('\n') diff --git a/tests/test_execution.py b/tests/test_execution.py index e6099fd..70e12ac 100644 --- a/tests/test_execution.py +++ b/tests/test_execution.py @@ -62,7 +62,7 @@ def test_simple_execution_context(): assert not ctx.started assert not ctx.stopped - ctx.recv(BEGIN, Bag(), END) + ctx.write(BEGIN, Bag(), END) assert not ctx.alive assert not ctx.started