Files
bonobo/bonobo/strategies/executor.py
Romain Dorgueil 92cc400fe7 [core] Refactoring IOFormats so there is one and only obvious way to send it.
This is the commit where I admit that having more than one input/output
format for readers and writers was complicating the code too much for a
very small gain, and that it would be easier to only have one way to do
it.

So such way is now:

- Returning (or yielding) a dict if you have key-value type collections.
- Returning (or yielding) a tuple if you have a list-type collection.
- Returning (or yielding) something else otherwise, which will continue
  to work like the old "arg0" format.

IOFORMAT options has been removed in favour of a RemovedOption, which
will complain if you're still trying to set it to anything else than the
one value allowed.
2017-10-15 21:37:22 +02:00

85 lines
2.3 KiB
Python

import time
import traceback
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
from bonobo.constants import BEGIN, END
from bonobo.strategies.base import Strategy
from bonobo.structs.bags import Bag
from bonobo.util.errors import print_error
class ExecutorStrategy(Strategy):
"""
Strategy based on a concurrent.futures.Executor subclass (or similar interface).
"""
executor_factory = Executor
def create_executor(self):
return self.executor_factory()
def execute(self, graph, **kwargs):
context = self.create_graph_execution_context(graph, **kwargs)
context.write(BEGIN, Bag(), END)
executor = self.create_executor()
futures = []
context.start_plugins(self.get_plugin_starter(executor, futures))
context.start(self.get_starter(executor, futures))
while context.alive:
time.sleep(0.1)
for plugin_context in context.plugins:
plugin_context.shutdown()
context.stop()
executor.shutdown()
return context
def get_starter(self, executor, futures):
def starter(node):
def _runner():
try:
node.start()
except Exception as exc:
print_error(exc, traceback.format_exc(), context=node, method='start')
node.input.on_end()
else:
node.loop()
try:
node.stop()
except Exception as exc:
print_error(exc, traceback.format_exc(), context=node, method='stop')
futures.append(executor.submit(_runner))
return starter
def get_plugin_starter(self, executor, futures):
def plugin_starter(plugin):
def _runner():
with plugin:
try:
plugin.loop()
except Exception as exc:
print_error(exc, traceback.format_exc(), context=plugin)
futures.append(executor.submit(_runner))
return plugin_starter
class ThreadPoolExecutorStrategy(ExecutorStrategy):
executor_factory = ThreadPoolExecutor
class ProcessPoolExecutorStrategy(ExecutorStrategy):
executor_factory = ProcessPoolExecutor