work in progress ...

This commit is contained in:
Romain Dorgueil
2016-12-10 10:37:46 +01:00
parent 9853b9073d
commit f0315936d3
2 changed files with 154 additions and 17 deletions

View File

@ -1,21 +1,152 @@
import time import time
from concurrent.futures import Executor import traceback
from concurrent.futures import ThreadPoolExecutor
from queue import Queue, Empty from queue import Queue, Empty
from bonobo.core.errors import InactiveReadableError
from bonobo.core.io import Input from bonobo.core.io import Input
from bonobo.core.tokens import BEGIN from bonobo.core.tokens import BEGIN, Token
from bonobo.util.iterators import force_iterator from bonobo.util.iterators import force_iterator, IntegerSequenceGenerator
NEW = Token('New')
RUNNING = Token('Running')
TERMINATED = Token('Terminated')
def noop(*args, **kwargs): pass
def get_initializer(c):
return getattr(c, 'initialize', noop)
def get_finalizer(c):
return getattr(c, 'finalize', noop)
class ComponentExecutionContext:
"""
todo: make the counter dependant of parent context?
"""
# todo clean this xxx
__thread_counter = IntegerSequenceGenerator()
@property
def name(self):
return self.component.__name__ + '-' + str(self.__thread_number)
def __init__(self, component):
# todo clean this xxx
self.__thread_number = next(self.__class__.__thread_counter)
self.component = component
self.input = Input()
self.outputs = []
self.state = NEW
def __repr__(self):
"""Adds "alive" information to the transform representation."""
return ('+' if self.running else '-') + ' ' + self.name + ' ' + self.component.get_stats_as_string()
def step(self, finalize=False):
# Pull data from the first available input channel.
row = self.input.get(timeout=1)
self.__execute_and_handle_output(self.component, row)
def run(self):
assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at the '
'beginning of a run().').format(type(self).__name__, NEW)
self.state = RUNNING
get_initializer(self.component)(self)
while True:
try:
self.step()
except KeyboardInterrupt as e:
raise
except InactiveReadableError as e:
# Terminated, exit loop.
break
except Empty as e:
continue
except Exception as e:
self.handle_error(e, traceback.format_exc())
assert self.state is RUNNING, ('A {} must be in {} state when finalization starts.').format(
type(self).__name__, RUNNING)
try:
self.state = TERMINATED
get_finalizer(self.component)(self)
except Exception as e:
self.handle_error(e, traceback.format_exc())
def handle_error(self, exc, tb):
raise NotImplementedError()
if STDERR in self.transform.OUTPUT_CHANNELS:
self.transform._output.put(({
'transform': self.transform,
'exception': exc,
'traceback': tb,
}, STDERR,))
print((str(exc) + '\n\n' + tb + '\n\n\n\n'))
else:
print((str(exc) + '\n\n' + tb + '\n\n\n\n'))
# Private
def __execute_and_handle_output(self, callable, *args, **kwargs):
"""Runs a transformation callable with given args/kwargs and flush the result into the right
output channel."""
timer = Timer()
with timer:
results = callable(*args, **kwargs)
self._exec_time += timer.duration
# Put data onto output channels
if isinstance(results, types.GeneratorType):
while True:
timer = Timer()
with timer:
try:
result = next(results)
except StopIteration as e:
break
self._exec_time += timer.duration
self._exec_count += 1
self._output.put(result)
elif results is not None:
self._exec_count += 1
self._output.put(results)
else:
self._exec_count += 1
class ExecutionContext:
def __init__(self, graph):
self.graph = graph
class Strategy: class Strategy:
context_type = ExecutionContext
def create_context(self, graph, *args, **kwargs):
return self.context_type(graph)
def execute(self, graph, *args, **kwargs): def execute(self, graph, *args, **kwargs):
raise NotImplementedError raise NotImplementedError
class NaiveStrategy(Strategy): class NaiveStrategy(Strategy):
def execute(self, graph, *args, **kwargs): def execute(self, graph, *args, **kwargs):
input_queues = {i: Queue() for i in range(len(graph.components))} context = self.create_context(graph)
for i, component in enumerate(graph.components):
input_queues = {i: Queue() for i in range(len(context.graph.components))}
for i, component in enumerate(context.graph.components):
while True: while True:
try: try:
args = (input_queues[i].get(block=False),) if i else () args = (input_queues[i].get(block=False),) if i else ()
@ -27,28 +158,20 @@ class NaiveStrategy(Strategy):
break break
class ExecutionContext:
def __init__(self, graph):
self.graph = graph
class ExecutorStrategy(Strategy): class ExecutorStrategy(Strategy):
context_type = ExecutionContext executor_type = ThreadPoolExecutor
executor_type = Executor
def __init__(self, executor=None): def __init__(self, executor=None):
self.executor = executor or self.executor_type() self.executor = executor or self.executor_type()
def create_context(self, graph, *args, **kwargs):
return self.context_type(graph)
def execute(self, graph, *args, **kwargs): def execute(self, graph, *args, **kwargs):
raise NotImplementedError()
context = self.create_context(graph) context = self.create_context(graph)
for i in graph.outputs_of(BEGIN): for i in graph.outputs_of(BEGIN):
self.call_component(i, *args, **kwargs) self.call_component(i, *args, **kwargs)
raise NotImplementedError()
while len(self.running): while len(self.running):
# print(self.running) # print(self.running)
time.sleep(0.1) time.sleep(0.1)

View File

@ -1,7 +1,21 @@
class IntegerSequenceGenerator:
"""Simple integer sequence generator."""
def __init__(self):
self.current = 0
def get(self):
return self.current
def __next__(self):
self.current += 1
return self.current
def force_iterator(x): def force_iterator(x):
if isinstance(x, str): if isinstance(x, str):
return [x] return [x]
try: try:
return iter(x) return iter(x)
except Exception as e: except Exception as e:
return [x] if x else [] return [x] if x else []