[core] Execution contexts are now context managers.

This commit is contained in:
Romain Dorgueil
2017-06-17 10:17:42 +02:00
parent a65ca635cf
commit 3c4010f9c3
3 changed files with 13 additions and 10 deletions

View File

@ -58,6 +58,13 @@ class LoopingExecutionContext(Wrapper):
# XXX enhancers
self._enhancers = get_enhancers(self.wrapped)
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type=None, exc_val=None, exc_tb=None):
self.stop()
def start(self):
if self.started:
raise RuntimeError('Cannot start a node twice ({}).'.format(get_name(self)))

View File

@ -59,7 +59,7 @@ class CsvReader(CsvHandler, FileReader):
for row in reader:
if len(row) != field_count:
raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count, ))
raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count,))
yield self.get_output(dict(zip(_headers, row)))

View File

@ -1,6 +1,5 @@
import time
import traceback
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
from bonobo.constants import BEGIN, END
@ -29,19 +28,16 @@ class ExecutorStrategy(Strategy):
futures = []
for plugin_context in context.plugins:
def _runner(plugin_context=plugin_context):
try:
plugin_context.start()
plugin_context.loop()
plugin_context.stop()
except Exception as exc:
print_error(exc, traceback.format_exc(), context=plugin_context)
with plugin_context:
try:
plugin_context.loop()
except Exception as exc:
print_error(exc, traceback.format_exc(), context=plugin_context)
futures.append(executor.submit(_runner))
for node_context in context.nodes:
def _runner(node_context=node_context):
try:
node_context.start()