From 3c4010f9c3113ac27c4f36f482b0d4c3c9604fb6 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Sat, 17 Jun 2017 10:17:42 +0200 Subject: [PATCH] [core] Execution contexts are now context managers. --- bonobo/execution/base.py | 7 +++++++ bonobo/nodes/io/csv.py | 2 +- bonobo/strategies/executor.py | 14 +++++--------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/bonobo/execution/base.py b/bonobo/execution/base.py index 779f212..641d761 100644 --- a/bonobo/execution/base.py +++ b/bonobo/execution/base.py @@ -58,6 +58,13 @@ class LoopingExecutionContext(Wrapper): # XXX enhancers self._enhancers = get_enhancers(self.wrapped) + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type=None, exc_val=None, exc_tb=None): + self.stop() + def start(self): if self.started: raise RuntimeError('Cannot start a node twice ({}).'.format(get_name(self))) diff --git a/bonobo/nodes/io/csv.py b/bonobo/nodes/io/csv.py index da70444..bf3872d 100644 --- a/bonobo/nodes/io/csv.py +++ b/bonobo/nodes/io/csv.py @@ -59,7 +59,7 @@ class CsvReader(CsvHandler, FileReader): for row in reader: if len(row) != field_count: - raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count, )) + raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count,)) yield self.get_output(dict(zip(_headers, row))) diff --git a/bonobo/strategies/executor.py b/bonobo/strategies/executor.py index 26b810b..d2cdcbe 100644 --- a/bonobo/strategies/executor.py +++ b/bonobo/strategies/executor.py @@ -1,6 +1,5 @@ import time import traceback - from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor from bonobo.constants import BEGIN, END @@ -29,19 +28,16 @@ class ExecutorStrategy(Strategy): futures = [] for plugin_context in context.plugins: - def _runner(plugin_context=plugin_context): - try: - plugin_context.start() - plugin_context.loop() - plugin_context.stop() - except Exception as exc: - print_error(exc, traceback.format_exc(), context=plugin_context) + with plugin_context: + try: + plugin_context.loop() + except Exception as exc: + print_error(exc, traceback.format_exc(), context=plugin_context) futures.append(executor.submit(_runner)) for node_context in context.nodes: - def _runner(node_context=node_context): try: node_context.start()