First implementation of services and basic injection. Not working with CLI for now.
This commit is contained in:
8
bonobo/strategies/__init__.py
Normal file
8
bonobo/strategies/__init__.py
Normal file
@ -0,0 +1,8 @@
|
||||
from bonobo.strategies.executor import ThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy
|
||||
from bonobo.strategies.naive import NaiveStrategy
|
||||
|
||||
__all__ = [
|
||||
'NaiveStrategy',
|
||||
'ProcessPoolExecutorStrategy',
|
||||
'ThreadPoolExecutorStrategy',
|
||||
]
|
||||
15
bonobo/strategies/base.py
Normal file
15
bonobo/strategies/base.py
Normal file
@ -0,0 +1,15 @@
|
||||
from bonobo.execution import GraphExecutionContext
|
||||
|
||||
|
||||
class Strategy:
|
||||
"""
|
||||
Base class for execution strategies.
|
||||
|
||||
"""
|
||||
graph_execution_context_factory = GraphExecutionContext
|
||||
|
||||
def create_graph_execution_context(self, graph, *args, **kwargs):
|
||||
return self.graph_execution_context_factory(graph, *args, **kwargs)
|
||||
|
||||
def execute(self, graph, *args, **kwargs):
|
||||
raise NotImplementedError
|
||||
62
bonobo/strategies/executor.py
Normal file
62
bonobo/strategies/executor.py
Normal file
@ -0,0 +1,62 @@
|
||||
import time
|
||||
|
||||
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
|
||||
|
||||
from bonobo.constants import BEGIN, END
|
||||
from bonobo.strategies.base import Strategy
|
||||
from bonobo.structs.bags import Bag
|
||||
|
||||
|
||||
class ExecutorStrategy(Strategy):
|
||||
"""
|
||||
Strategy based on a concurrent.futures.Executor subclass (or similar interface).
|
||||
|
||||
"""
|
||||
|
||||
executor_factory = Executor
|
||||
|
||||
def create_executor(self):
|
||||
return self.executor_factory()
|
||||
|
||||
def execute(self, graph, *args, plugins=None, services=None, **kwargs):
|
||||
context = self.create_graph_execution_context(graph, plugins=plugins, services=services)
|
||||
context.recv(BEGIN, Bag(), END)
|
||||
|
||||
executor = self.create_executor()
|
||||
|
||||
futures = []
|
||||
|
||||
for plugin_context in context.plugins:
|
||||
|
||||
def _runner(plugin_context=plugin_context):
|
||||
plugin_context.start()
|
||||
plugin_context.loop()
|
||||
plugin_context.stop()
|
||||
|
||||
futures.append(executor.submit(_runner))
|
||||
|
||||
for node_context in context.nodes:
|
||||
|
||||
def _runner(node_context=node_context):
|
||||
node_context.start()
|
||||
node_context.loop()
|
||||
|
||||
futures.append(executor.submit(_runner))
|
||||
|
||||
while context.alive:
|
||||
time.sleep(0.2)
|
||||
|
||||
for plugin_context in context.plugins:
|
||||
plugin_context.shutdown()
|
||||
|
||||
executor.shutdown()
|
||||
|
||||
return context
|
||||
|
||||
|
||||
class ThreadPoolExecutorStrategy(ExecutorStrategy):
|
||||
executor_factory = ThreadPoolExecutor
|
||||
|
||||
|
||||
class ProcessPoolExecutorStrategy(ExecutorStrategy):
|
||||
executor_factory = ProcessPoolExecutor
|
||||
16
bonobo/strategies/naive.py
Normal file
16
bonobo/strategies/naive.py
Normal file
@ -0,0 +1,16 @@
|
||||
from bonobo.constants import BEGIN, END
|
||||
from bonobo.strategies.base import Strategy
|
||||
from bonobo.structs.bags import Bag
|
||||
|
||||
|
||||
class NaiveStrategy(Strategy):
|
||||
def execute(self, graph, *args, plugins=None, **kwargs):
|
||||
context = self.create_graph_execution_context(graph, plugins=plugins)
|
||||
context.recv(BEGIN, Bag(), END)
|
||||
|
||||
# TODO: how to run plugins in "naive" mode ?
|
||||
context.start()
|
||||
context.loop()
|
||||
context.stop()
|
||||
|
||||
return context
|
||||
1
bonobo/strategies/util.py
Normal file
1
bonobo/strategies/util.py
Normal file
@ -0,0 +1 @@
|
||||
|
||||
Reference in New Issue
Block a user