refactoring contexts, moved json writer to a class as it make more sense.

This commit is contained in:
Romain Dorgueil
2016-12-27 09:06:44 +01:00
parent f37f178630
commit 512e2ab46d
8 changed files with 173 additions and 140 deletions

View File

@ -35,95 +35,100 @@ class ExecutionContext:
def __iter__(self): def __iter__(self):
yield from self.components yield from self.components
def impulse(self): def recv(self, *messages):
"""Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in
our graph."""
for i in self.graph.outputs_of(BEGIN): for i in self.graph.outputs_of(BEGIN):
self[i].recv(BEGIN) for message in messages:
self[i].recv(Bag()) self[i].recv(message)
self[i].recv(END)
@property @property
def running(self): def alive(self):
return any(component.running for component in self.components) return any(component.alive for component in self.components)
class PluginExecutionContext: class AbstractLoopContext:
def __init__(self, plugin, parent): alive = True
self.parent = parent PERIOD = 0.25
self.plugin = plugin
self.alive = True def __init__(self, wrapped):
self.wrapped = wrapped
def run(self):
self.initialize()
self.loop()
self.finalize()
def initialize(self): def initialize(self):
# pylint: disable=broad-except # pylint: disable=broad-except
try: try:
get_initializer(self.plugin)(self) get_initializer(self.wrapped)(self)
except Exception as exc: except Exception as exc:
self.handle_error(exc, traceback.format_exc()) self.handle_error(exc, traceback.format_exc())
def loop(self):
"""Generic loop. A bit boring. """
while self.alive:
self._loop()
sleep(self.PERIOD)
def _loop(self):
"""
TODO xxx this is a step, not a loop
"""
raise NotImplementedError('Abstract.')
def finalize(self): def finalize(self):
"""Generic finalizer. """
# pylint: disable=broad-except # pylint: disable=broad-except
try: try:
get_finalizer(self.plugin)(self) get_finalizer(self.wrapped)(self)
except Exception as exc: except Exception as exc:
self.handle_error(exc, traceback.format_exc()) self.handle_error(exc, traceback.format_exc())
def run(self): def handle_error(self, exc, trace):
self.initialize() """
Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception
or somehow make me think it is an exception, I'll handle it.
while self.alive: :param exc: the culprit
# todo with wrap_errors .... :param trace: Hercule Poirot's logbook.
:return: to hell
"""
print('\U0001F4A3 {} in {}'.format(type(exc).__name__, self.wrapped))
print(trace)
try:
self.plugin.run(self)
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
sleep(0.25) class PluginExecutionContext(AbstractLoopContext):
def __init__(self, plugin, parent):
self.finalize() self.plugin = plugin
self.parent = parent
super().__init__(self.plugin)
def shutdown(self): def shutdown(self):
self.alive = False self.alive = False
def handle_error(self, exc, trace): def _loop(self):
print('\U0001F4A3 {} in plugin {}'.format(type(exc).__name__, self.plugin)) try:
print(trace) self.wrapped.run(self)
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
def _iter(mixed): class ComponentExecutionContext(WithStatistics, AbstractLoopContext):
if isinstance(mixed, (dict, list, str)):
raise TypeError(type(mixed).__name__)
return iter(mixed)
def _resolve(input_bag, output):
# NotModified means to send the input unmodified to output.
if output is NOT_MODIFIED:
return input_bag
# If it does not look like a bag, let's create one for easier manipulation
if hasattr(output, 'apply'):
# Already a bag? Check if we need to set parent.
if INHERIT_INPUT in output.flags:
output.set_parent(input_bag)
else:
# Not a bag? Let's encapsulate it.
output = Bag(output)
return output
class ComponentExecutionContext(WithStatistics):
""" """
todo: make the counter dependant of parent context? todo: make the counter dependant of parent context?
""" """
@property @property
def name(self): def alive(self):
return self.component.__name__ """todo check if this is right, and where it is used"""
return self.input.alive
@property @property
def running(self): def name(self):
return self.input.alive return self.component.__name__
def __init__(self, component, parent): def __init__(self, component, parent):
self.parent = parent self.parent = parent
@ -139,9 +144,11 @@ class ComponentExecutionContext(WithStatistics):
'write': 0, 'write': 0,
} }
super().__init__(self.component)
def __repr__(self): def __repr__(self):
"""Adds "alive" information to the transform representation.""" """Adds "alive" information to the transform representation."""
return ('+' if self.running else '-') + ' ' + self.name + ' ' + self.get_stats_as_string() return ('+' if self.alive else '-') + ' ' + self.name + ' ' + self.get_stats_as_string()
def get_stats(self, *args, **kwargs): def get_stats(self, *args, **kwargs):
return ( return (
@ -155,24 +162,33 @@ class ComponentExecutionContext(WithStatistics):
'err', 'err',
self.stats['err'], ), ) self.stats['err'], ), )
def impulse(self): def recv(self, *messages):
self.input.put(None) """
Push a message list to this context's input queue.
:param mixed value: message
"""
for message in messages:
self.input.put(message)
def send(self, value, _control=False): def send(self, value, _control=False):
"""
Sends a message to all of this context's outputs.
:param mixed value: message
:param _control: if true, won't count in statistics.
"""
if not _control: if not _control:
self.stats['out'] += 1 self.stats['out'] += 1
for output in self.outputs: for output in self.outputs:
output.put(value) output.put(value)
def recv(self, value):
self.input.put(value)
def get(self): def get(self):
""" """
Get from the queue first, then increment stats, so if Queue raise Timeout or Empty, stat won't be changed. Get from the queue first, then increment stats, so if Queue raise Timeout or Empty, stat won't be changed.
""" """
row = self.input.get(timeout=1) row = self.input.get(timeout=self.PERIOD)
self.stats['in'] += 1 self.stats['in'] += 1
return row return row
@ -182,6 +198,27 @@ class ComponentExecutionContext(WithStatistics):
return bag.apply(self.component, self) return bag.apply(self.component, self)
return bag.apply(self.component) return bag.apply(self.component)
def initialize(self):
# pylint: disable=broad-except
assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at '
'initialization time.').format(type(self).__name__, NEW)
self.state = RUNNING
super().initialize()
def loop(self):
while True:
try:
self.step()
except KeyboardInterrupt:
raise
except InactiveReadableError:
break
except Empty:
sleep(self.PERIOD)
continue
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
def step(self): def step(self):
# Pull data from the first available input channel. # Pull data from the first available input channel.
"""Runs a transformation callable with given args/kwargs and flush the result into the right """Runs a transformation callable with given args/kwargs and flush the result into the right
@ -209,48 +246,33 @@ class ComponentExecutionContext(WithStatistics):
break break
self.send(_resolve(input_bag, output)) self.send(_resolve(input_bag, output))
def initialize(self):
# pylint: disable=broad-except
assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at '
'initialization time.').format(type(self).__name__, NEW)
self.state = RUNNING
try:
get_initializer(self.component)(self)
except Exception as exc:
self.handle_error(exc, traceback.format_exc())
def finalize(self): def finalize(self):
# pylint: disable=broad-except # pylint: disable=broad-except
assert self.state is RUNNING, ('A {} must be in {} state at finalization time.').format( assert self.state is RUNNING, ('A {} must be in {} state at finalization time.').format(
type(self).__name__, RUNNING) type(self).__name__, RUNNING)
self.state = TERMINATED self.state = TERMINATED
try: super().finalize()
get_finalizer(self.component)(self)
except Exception as exc:
self.handle_error(exc, traceback.format_exc())
def run(self):
self.initialize()
while True: def _iter(mixed):
try: if isinstance(mixed, (dict, list, str)):
self.step() raise TypeError(type(mixed).__name__)
except KeyboardInterrupt: return iter(mixed)
raise
except InactiveReadableError:
sleep(1)
# Terminated, exit loop.
break # BREAK !!!
except Empty:
continue
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
self.finalize()
def handle_error(self, exc, trace): def _resolve(input_bag, output):
self.stats['err'] += 1 # NotModified means to send the input unmodified to output.
print('\U0001F4A3 {} in {}'.format(type(exc).__name__, self.component)) if output is NOT_MODIFIED:
print(trace) return input_bag
# If it does not look like a bag, let's create one for easier manipulation
if hasattr(output, 'apply'):
# Already a bag? Check if we need to set parent.
if INHERIT_INPUT in output.flags:
output.set_parent(input_bag)
else:
# Not a bag? Let's encapsulate it.
output = Bag(output)
return output

View File

@ -4,6 +4,9 @@ from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from bonobo.core.strategies.base import Strategy from bonobo.core.strategies.base import Strategy
from bonobo.util.tokens import BEGIN, END
from ..bags import Bag
class ExecutorStrategy(Strategy): class ExecutorStrategy(Strategy):
@ -16,7 +19,7 @@ class ExecutorStrategy(Strategy):
def execute(self, graph, *args, plugins=None, **kwargs): def execute(self, graph, *args, plugins=None, **kwargs):
context = self.create_context(graph, plugins=plugins) context = self.create_context(graph, plugins=plugins)
context.impulse() context.recv(BEGIN, Bag(), END)
executor = self.executor_factory() executor = self.executor_factory()
@ -28,7 +31,7 @@ class ExecutorStrategy(Strategy):
for component_context in context.components: for component_context in context.components:
futures.append(executor.submit(component_context.run)) futures.append(executor.submit(component_context.run))
while context.running: while context.alive:
time.sleep(0.2) time.sleep(0.2)
for plugin_context in context.plugins: for plugin_context in context.plugins:

View File

@ -1,10 +1,13 @@
from bonobo.core.strategies.base import Strategy from bonobo.core.strategies.base import Strategy
from bonobo.util.tokens import BEGIN, END
from ..bags import Bag
class NaiveStrategy(Strategy): class NaiveStrategy(Strategy):
def execute(self, graph, *args, plugins=None, **kwargs): def execute(self, graph, *args, plugins=None, **kwargs):
context = self.create_context(graph, plugins=plugins) context = self.create_context(graph, plugins=plugins)
context.impulse() context.recv(BEGIN, Bag(), END)
# TODO: how to run plugins in "naive" mode ? # TODO: how to run plugins in "naive" mode ?

View File

@ -76,7 +76,7 @@ class ConsoleOutputPlugin(Plugin):
t_cnt = len(context) t_cnt = len(context)
for i, component in enumerate(context): for i, component in enumerate(context):
if component.running: if component.alive:
_line = ''.join(( _line = ''.join((
t.black('({})'.format(i + 1)), t.black('({})'.format(i + 1)),
' ', ' ',

View File

@ -1,18 +1,25 @@
import json import json
from bonobo.util.lifecycle import with_context, set_initializer, set_finalizer from bonobo.util.lifecycle import with_context
__all__ = ['to_json', ] __all__ = [
'from_json',
'to_json',
]
def to_json(path_or_buf): @with_context
# todo different cases + documentation class JsonWriter:
# case 1: path_or_buf is str, we consider it filename, open and write def __init__(self, path_or_buf):
# case 2: pob is None, json should be yielded self.path_or_buf = path_or_buf
# case 3: pob is stream, filelike, write, gogog.
@with_context def initialize(self, ctx):
def _to_json(ctx, row): assert not hasattr(ctx, 'fp'), 'One at a time, baby.'
ctx.fp = open(self.path_or_buf, 'w+')
ctx.fp.write('[\n')
ctx.first = True
def __call__(self, ctx, row):
if ctx.first: if ctx.first:
prefix = '' prefix = ''
ctx.first = False ctx.first = False
@ -20,19 +27,14 @@ def to_json(path_or_buf):
prefix = ',\n' prefix = ',\n'
ctx.fp.write(prefix + json.dumps(row)) ctx.fp.write(prefix + json.dumps(row))
@set_initializer(_to_json) def finalize(self, ctx):
def _to_json_initialize(ctx):
assert not hasattr(ctx, 'fp'), 'One at a time, baby.'
ctx.fp = open(path_or_buf, 'w+')
ctx.fp.write('[\n')
ctx.first = True
@set_finalizer(_to_json)
def _to_json_finalize(ctx):
ctx.fp.write('\n]') ctx.fp.write('\n]')
ctx.fp.close() ctx.fp.close()
del ctx.fp, ctx.first del ctx.fp, ctx.first
_to_json.__name__ = 'to_json'
return _to_json def from_json(path_or_buf):
pass
to_json = JsonWriter

View File

@ -25,6 +25,10 @@ get_initializer, set_initializer = _create_lifecycle_functions('initializer', 'i
get_finalizer, set_finalizer = _create_lifecycle_functions('finalizer', 'finalize') get_finalizer, set_finalizer = _create_lifecycle_functions('finalizer', 'finalize')
def with_context(func): class Contextual:
func._with_context = True _with_context = True
return func
def with_context(cls_or_func):
cls_or_func._with_context = True
return cls_or_func

View File

@ -1,6 +1,7 @@
from bonobo import Graph, NaiveStrategy from bonobo import Graph, NaiveStrategy, Bag
from bonobo.core.contexts import ExecutionContext from bonobo.core.contexts import ExecutionContext
from bonobo.util.lifecycle import with_context from bonobo.util.lifecycle import with_context
from bonobo.util.tokens import BEGIN, END
def generate_integers(): def generate_integers():
@ -28,7 +29,7 @@ def test_empty_execution_context():
assert not len(ctx.components) assert not len(ctx.components)
assert not len(ctx.plugins) assert not len(ctx.plugins)
assert not ctx.running assert not ctx.alive
def test_execution(): def test_execution():
@ -52,8 +53,8 @@ def test_simple_execution_context():
for i, component in enumerate(chain): for i, component in enumerate(chain):
assert ctx[i].component is component assert ctx[i].component is component
assert not ctx.running assert not ctx.alive
ctx.impulse() ctx.recv(BEGIN, Bag(), END)
assert ctx.running assert ctx.alive

View File

@ -1,21 +1,19 @@
import pytest import pytest
from bonobo import to_json from bonobo import to_json, Bag
from bonobo.util.lifecycle import get_initializer, get_finalizer from bonobo.core.contexts import ComponentExecutionContext
from bonobo.util.tokens import BEGIN, END
class ContextMock:
pass
def test_write_json_to_file(tmpdir): def test_write_json_to_file(tmpdir):
file = tmpdir.join('output.json') file = tmpdir.join('output.json')
json_writer = to_json(str(file)) json_writer = to_json(str(file))
context = ContextMock() context = ComponentExecutionContext(json_writer, None)
get_initializer(json_writer)(context) context.initialize()
json_writer(context, {'foo': 'bar'}) context.recv(BEGIN, Bag({'foo': 'bar'}), END)
get_finalizer(json_writer)(context) context.step()
context.finalize()
assert file.read() == '''[ assert file.read() == '''[
{"foo": "bar"} {"foo": "bar"}
@ -32,6 +30,6 @@ def test_write_json_without_initializer_should_not_work(tmpdir):
file = tmpdir.join('output.json') file = tmpdir.join('output.json')
json_writer = to_json(str(file)) json_writer = to_json(str(file))
context = ContextMock() context = ComponentExecutionContext(json_writer, None)
with pytest.raises(AttributeError): with pytest.raises(AttributeError):
json_writer(context, {'foo': 'bar'}) json_writer(context, {'foo': 'bar'})