Files
bonobo/bonobo/strategies/executor.py
Romain Dorgueil 1d27376785 release: 0.4.2
2017-06-18 21:35:30 +02:00

76 lines
2.2 KiB
Python

import time
import traceback
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
from bonobo.constants import BEGIN, END
from bonobo.strategies.base import Strategy
from bonobo.structs.bags import Bag
from bonobo.util.errors import print_error
class ExecutorStrategy(Strategy):
"""
Strategy based on a concurrent.futures.Executor subclass (or similar interface).
"""
executor_factory = Executor
def create_executor(self):
return self.executor_factory()
def execute(self, graph, *args, plugins=None, services=None, **kwargs):
context = self.create_graph_execution_context(graph, plugins=plugins, services=services)
context.recv(BEGIN, Bag(), END)
executor = self.create_executor()
futures = []
for plugin_context in context.plugins:
def _runner(plugin_context=plugin_context):
with plugin_context:
try:
plugin_context.loop()
except Exception as exc:
print_error(exc, traceback.format_exc(), context=plugin_context)
futures.append(executor.submit(_runner))
for node_context in context.nodes:
def _runner(node_context=node_context):
try:
node_context.start()
except Exception as exc:
print_error(exc, traceback.format_exc(), context=node_context, method='start')
node_context.input.on_end()
else:
node_context.loop()
try:
node_context.stop()
except Exception as exc:
print_error(exc, traceback.format_exc(), context=node_context, method='stop')
futures.append(executor.submit(_runner))
while context.alive:
time.sleep(0.2)
for plugin_context in context.plugins:
plugin_context.shutdown()
executor.shutdown()
return context
class ThreadPoolExecutorStrategy(ExecutorStrategy):
executor_factory = ThreadPoolExecutor
class ProcessPoolExecutorStrategy(ExecutorStrategy):
executor_factory = ProcessPoolExecutor