Merge pull request #9 from hartym/master
context refactoring, json writer as class.
This commit is contained in:
@ -35,95 +35,100 @@ class ExecutionContext:
|
|||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
yield from self.components
|
yield from self.components
|
||||||
|
|
||||||
def impulse(self):
|
def recv(self, *messages):
|
||||||
|
"""Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in
|
||||||
|
our graph."""
|
||||||
|
|
||||||
for i in self.graph.outputs_of(BEGIN):
|
for i in self.graph.outputs_of(BEGIN):
|
||||||
self[i].recv(BEGIN)
|
for message in messages:
|
||||||
self[i].recv(Bag())
|
self[i].recv(message)
|
||||||
self[i].recv(END)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def running(self):
|
def alive(self):
|
||||||
return any(component.running for component in self.components)
|
return any(component.alive for component in self.components)
|
||||||
|
|
||||||
|
|
||||||
class PluginExecutionContext:
|
class AbstractLoopContext:
|
||||||
def __init__(self, plugin, parent):
|
alive = True
|
||||||
self.parent = parent
|
PERIOD = 0.25
|
||||||
self.plugin = plugin
|
|
||||||
self.alive = True
|
def __init__(self, wrapped):
|
||||||
|
self.wrapped = wrapped
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.initialize()
|
||||||
|
self.loop()
|
||||||
|
self.finalize()
|
||||||
|
|
||||||
def initialize(self):
|
def initialize(self):
|
||||||
# pylint: disable=broad-except
|
# pylint: disable=broad-except
|
||||||
try:
|
try:
|
||||||
get_initializer(self.plugin)(self)
|
get_initializer(self.wrapped)(self)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
self.handle_error(exc, traceback.format_exc())
|
self.handle_error(exc, traceback.format_exc())
|
||||||
|
|
||||||
|
def loop(self):
|
||||||
|
"""Generic loop. A bit boring. """
|
||||||
|
while self.alive:
|
||||||
|
self._loop()
|
||||||
|
sleep(self.PERIOD)
|
||||||
|
|
||||||
|
def _loop(self):
|
||||||
|
"""
|
||||||
|
TODO xxx this is a step, not a loop
|
||||||
|
"""
|
||||||
|
raise NotImplementedError('Abstract.')
|
||||||
|
|
||||||
def finalize(self):
|
def finalize(self):
|
||||||
|
"""Generic finalizer. """
|
||||||
# pylint: disable=broad-except
|
# pylint: disable=broad-except
|
||||||
try:
|
try:
|
||||||
get_finalizer(self.plugin)(self)
|
get_finalizer(self.wrapped)(self)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
self.handle_error(exc, traceback.format_exc())
|
self.handle_error(exc, traceback.format_exc())
|
||||||
|
|
||||||
def run(self):
|
def handle_error(self, exc, trace):
|
||||||
self.initialize()
|
"""
|
||||||
|
Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception
|
||||||
|
or somehow make me think it is an exception, I'll handle it.
|
||||||
|
|
||||||
while self.alive:
|
:param exc: the culprit
|
||||||
# todo with wrap_errors ....
|
:param trace: Hercule Poirot's logbook.
|
||||||
|
:return: to hell
|
||||||
|
"""
|
||||||
|
print('\U0001F4A3 {} in {}'.format(type(exc).__name__, self.wrapped))
|
||||||
|
print(trace)
|
||||||
|
|
||||||
try:
|
|
||||||
self.plugin.run(self)
|
|
||||||
except Exception as exc: # pylint: disable=broad-except
|
|
||||||
self.handle_error(exc, traceback.format_exc())
|
|
||||||
|
|
||||||
sleep(0.25)
|
class PluginExecutionContext(AbstractLoopContext):
|
||||||
|
def __init__(self, plugin, parent):
|
||||||
self.finalize()
|
self.plugin = plugin
|
||||||
|
self.parent = parent
|
||||||
|
super().__init__(self.plugin)
|
||||||
|
|
||||||
def shutdown(self):
|
def shutdown(self):
|
||||||
self.alive = False
|
self.alive = False
|
||||||
|
|
||||||
def handle_error(self, exc, trace):
|
def _loop(self):
|
||||||
print('\U0001F4A3 {} in plugin {}'.format(type(exc).__name__, self.plugin))
|
try:
|
||||||
print(trace)
|
self.wrapped.run(self)
|
||||||
|
except Exception as exc: # pylint: disable=broad-except
|
||||||
|
self.handle_error(exc, traceback.format_exc())
|
||||||
|
|
||||||
|
|
||||||
def _iter(mixed):
|
class ComponentExecutionContext(WithStatistics, AbstractLoopContext):
|
||||||
if isinstance(mixed, (dict, list, str)):
|
|
||||||
raise TypeError(type(mixed).__name__)
|
|
||||||
return iter(mixed)
|
|
||||||
|
|
||||||
|
|
||||||
def _resolve(input_bag, output):
|
|
||||||
# NotModified means to send the input unmodified to output.
|
|
||||||
if output is NOT_MODIFIED:
|
|
||||||
return input_bag
|
|
||||||
|
|
||||||
# If it does not look like a bag, let's create one for easier manipulation
|
|
||||||
if hasattr(output, 'apply'):
|
|
||||||
# Already a bag? Check if we need to set parent.
|
|
||||||
if INHERIT_INPUT in output.flags:
|
|
||||||
output.set_parent(input_bag)
|
|
||||||
else:
|
|
||||||
# Not a bag? Let's encapsulate it.
|
|
||||||
output = Bag(output)
|
|
||||||
|
|
||||||
return output
|
|
||||||
|
|
||||||
|
|
||||||
class ComponentExecutionContext(WithStatistics):
|
|
||||||
"""
|
"""
|
||||||
todo: make the counter dependant of parent context?
|
todo: make the counter dependant of parent context?
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def name(self):
|
def alive(self):
|
||||||
return self.component.__name__
|
"""todo check if this is right, and where it is used"""
|
||||||
|
return self.input.alive
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def running(self):
|
def name(self):
|
||||||
return self.input.alive
|
return self.component.__name__
|
||||||
|
|
||||||
def __init__(self, component, parent):
|
def __init__(self, component, parent):
|
||||||
self.parent = parent
|
self.parent = parent
|
||||||
@ -139,9 +144,11 @@ class ComponentExecutionContext(WithStatistics):
|
|||||||
'write': 0,
|
'write': 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
super().__init__(self.component)
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
"""Adds "alive" information to the transform representation."""
|
"""Adds "alive" information to the transform representation."""
|
||||||
return ('+' if self.running else '-') + ' ' + self.name + ' ' + self.get_stats_as_string()
|
return ('+' if self.alive else '-') + ' ' + self.name + ' ' + self.get_stats_as_string()
|
||||||
|
|
||||||
def get_stats(self, *args, **kwargs):
|
def get_stats(self, *args, **kwargs):
|
||||||
return (
|
return (
|
||||||
@ -155,24 +162,33 @@ class ComponentExecutionContext(WithStatistics):
|
|||||||
'err',
|
'err',
|
||||||
self.stats['err'], ), )
|
self.stats['err'], ), )
|
||||||
|
|
||||||
def impulse(self):
|
def recv(self, *messages):
|
||||||
self.input.put(None)
|
"""
|
||||||
|
Push a message list to this context's input queue.
|
||||||
|
|
||||||
|
:param mixed value: message
|
||||||
|
"""
|
||||||
|
for message in messages:
|
||||||
|
self.input.put(message)
|
||||||
|
|
||||||
def send(self, value, _control=False):
|
def send(self, value, _control=False):
|
||||||
|
"""
|
||||||
|
Sends a message to all of this context's outputs.
|
||||||
|
|
||||||
|
:param mixed value: message
|
||||||
|
:param _control: if true, won't count in statistics.
|
||||||
|
"""
|
||||||
if not _control:
|
if not _control:
|
||||||
self.stats['out'] += 1
|
self.stats['out'] += 1
|
||||||
for output in self.outputs:
|
for output in self.outputs:
|
||||||
output.put(value)
|
output.put(value)
|
||||||
|
|
||||||
def recv(self, value):
|
|
||||||
self.input.put(value)
|
|
||||||
|
|
||||||
def get(self):
|
def get(self):
|
||||||
"""
|
"""
|
||||||
Get from the queue first, then increment stats, so if Queue raise Timeout or Empty, stat won't be changed.
|
Get from the queue first, then increment stats, so if Queue raise Timeout or Empty, stat won't be changed.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
row = self.input.get(timeout=1)
|
row = self.input.get(timeout=self.PERIOD)
|
||||||
self.stats['in'] += 1
|
self.stats['in'] += 1
|
||||||
return row
|
return row
|
||||||
|
|
||||||
@ -182,6 +198,27 @@ class ComponentExecutionContext(WithStatistics):
|
|||||||
return bag.apply(self.component, self)
|
return bag.apply(self.component, self)
|
||||||
return bag.apply(self.component)
|
return bag.apply(self.component)
|
||||||
|
|
||||||
|
def initialize(self):
|
||||||
|
# pylint: disable=broad-except
|
||||||
|
assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at '
|
||||||
|
'initialization time.').format(type(self).__name__, NEW)
|
||||||
|
self.state = RUNNING
|
||||||
|
super().initialize()
|
||||||
|
|
||||||
|
def loop(self):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
self.step()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
raise
|
||||||
|
except InactiveReadableError:
|
||||||
|
break
|
||||||
|
except Empty:
|
||||||
|
sleep(self.PERIOD)
|
||||||
|
continue
|
||||||
|
except Exception as exc: # pylint: disable=broad-except
|
||||||
|
self.handle_error(exc, traceback.format_exc())
|
||||||
|
|
||||||
def step(self):
|
def step(self):
|
||||||
# Pull data from the first available input channel.
|
# Pull data from the first available input channel.
|
||||||
"""Runs a transformation callable with given args/kwargs and flush the result into the right
|
"""Runs a transformation callable with given args/kwargs and flush the result into the right
|
||||||
@ -209,48 +246,33 @@ class ComponentExecutionContext(WithStatistics):
|
|||||||
break
|
break
|
||||||
self.send(_resolve(input_bag, output))
|
self.send(_resolve(input_bag, output))
|
||||||
|
|
||||||
def initialize(self):
|
|
||||||
# pylint: disable=broad-except
|
|
||||||
assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at '
|
|
||||||
'initialization time.').format(type(self).__name__, NEW)
|
|
||||||
self.state = RUNNING
|
|
||||||
|
|
||||||
try:
|
|
||||||
get_initializer(self.component)(self)
|
|
||||||
except Exception as exc:
|
|
||||||
self.handle_error(exc, traceback.format_exc())
|
|
||||||
|
|
||||||
def finalize(self):
|
def finalize(self):
|
||||||
# pylint: disable=broad-except
|
# pylint: disable=broad-except
|
||||||
assert self.state is RUNNING, ('A {} must be in {} state at finalization time.').format(
|
assert self.state is RUNNING, ('A {} must be in {} state at finalization time.').format(
|
||||||
type(self).__name__, RUNNING)
|
type(self).__name__, RUNNING)
|
||||||
self.state = TERMINATED
|
self.state = TERMINATED
|
||||||
|
|
||||||
try:
|
super().finalize()
|
||||||
get_finalizer(self.component)(self)
|
|
||||||
except Exception as exc:
|
|
||||||
self.handle_error(exc, traceback.format_exc())
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
self.initialize()
|
|
||||||
|
|
||||||
while True:
|
def _iter(mixed):
|
||||||
try:
|
if isinstance(mixed, (dict, list, str)):
|
||||||
self.step()
|
raise TypeError(type(mixed).__name__)
|
||||||
except KeyboardInterrupt:
|
return iter(mixed)
|
||||||
raise
|
|
||||||
except InactiveReadableError:
|
|
||||||
sleep(1)
|
|
||||||
# Terminated, exit loop.
|
|
||||||
break # BREAK !!!
|
|
||||||
except Empty:
|
|
||||||
continue
|
|
||||||
except Exception as exc: # pylint: disable=broad-except
|
|
||||||
self.handle_error(exc, traceback.format_exc())
|
|
||||||
|
|
||||||
self.finalize()
|
|
||||||
|
|
||||||
def handle_error(self, exc, trace):
|
def _resolve(input_bag, output):
|
||||||
self.stats['err'] += 1
|
# NotModified means to send the input unmodified to output.
|
||||||
print('\U0001F4A3 {} in {}'.format(type(exc).__name__, self.component))
|
if output is NOT_MODIFIED:
|
||||||
print(trace)
|
return input_bag
|
||||||
|
|
||||||
|
# If it does not look like a bag, let's create one for easier manipulation
|
||||||
|
if hasattr(output, 'apply'):
|
||||||
|
# Already a bag? Check if we need to set parent.
|
||||||
|
if INHERIT_INPUT in output.flags:
|
||||||
|
output.set_parent(input_bag)
|
||||||
|
else:
|
||||||
|
# Not a bag? Let's encapsulate it.
|
||||||
|
output = Bag(output)
|
||||||
|
|
||||||
|
return output
|
||||||
|
|||||||
@ -4,6 +4,9 @@ from concurrent.futures import ProcessPoolExecutor
|
|||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
|
||||||
from bonobo.core.strategies.base import Strategy
|
from bonobo.core.strategies.base import Strategy
|
||||||
|
from bonobo.util.tokens import BEGIN, END
|
||||||
|
|
||||||
|
from ..bags import Bag
|
||||||
|
|
||||||
|
|
||||||
class ExecutorStrategy(Strategy):
|
class ExecutorStrategy(Strategy):
|
||||||
@ -16,7 +19,7 @@ class ExecutorStrategy(Strategy):
|
|||||||
|
|
||||||
def execute(self, graph, *args, plugins=None, **kwargs):
|
def execute(self, graph, *args, plugins=None, **kwargs):
|
||||||
context = self.create_context(graph, plugins=plugins)
|
context = self.create_context(graph, plugins=plugins)
|
||||||
context.impulse()
|
context.recv(BEGIN, Bag(), END)
|
||||||
|
|
||||||
executor = self.executor_factory()
|
executor = self.executor_factory()
|
||||||
|
|
||||||
@ -28,7 +31,7 @@ class ExecutorStrategy(Strategy):
|
|||||||
for component_context in context.components:
|
for component_context in context.components:
|
||||||
futures.append(executor.submit(component_context.run))
|
futures.append(executor.submit(component_context.run))
|
||||||
|
|
||||||
while context.running:
|
while context.alive:
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
|
|
||||||
for plugin_context in context.plugins:
|
for plugin_context in context.plugins:
|
||||||
|
|||||||
@ -1,10 +1,13 @@
|
|||||||
from bonobo.core.strategies.base import Strategy
|
from bonobo.core.strategies.base import Strategy
|
||||||
|
from bonobo.util.tokens import BEGIN, END
|
||||||
|
|
||||||
|
from ..bags import Bag
|
||||||
|
|
||||||
|
|
||||||
class NaiveStrategy(Strategy):
|
class NaiveStrategy(Strategy):
|
||||||
def execute(self, graph, *args, plugins=None, **kwargs):
|
def execute(self, graph, *args, plugins=None, **kwargs):
|
||||||
context = self.create_context(graph, plugins=plugins)
|
context = self.create_context(graph, plugins=plugins)
|
||||||
context.impulse()
|
context.recv(BEGIN, Bag(), END)
|
||||||
|
|
||||||
# TODO: how to run plugins in "naive" mode ?
|
# TODO: how to run plugins in "naive" mode ?
|
||||||
|
|
||||||
|
|||||||
@ -76,7 +76,7 @@ class ConsoleOutputPlugin(Plugin):
|
|||||||
t_cnt = len(context)
|
t_cnt = len(context)
|
||||||
|
|
||||||
for i, component in enumerate(context):
|
for i, component in enumerate(context):
|
||||||
if component.running:
|
if component.alive:
|
||||||
_line = ''.join((
|
_line = ''.join((
|
||||||
t.black('({})'.format(i + 1)),
|
t.black('({})'.format(i + 1)),
|
||||||
' ',
|
' ',
|
||||||
|
|||||||
@ -1,18 +1,25 @@
|
|||||||
import json
|
import json
|
||||||
|
|
||||||
from bonobo.util.lifecycle import with_context, set_initializer, set_finalizer
|
from bonobo.util.lifecycle import with_context
|
||||||
|
|
||||||
__all__ = ['to_json', ]
|
__all__ = [
|
||||||
|
'from_json',
|
||||||
|
'to_json',
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def to_json(path_or_buf):
|
@with_context
|
||||||
# todo different cases + documentation
|
class JsonWriter:
|
||||||
# case 1: path_or_buf is str, we consider it filename, open and write
|
def __init__(self, path_or_buf):
|
||||||
# case 2: pob is None, json should be yielded
|
self.path_or_buf = path_or_buf
|
||||||
# case 3: pob is stream, filelike, write, gogog.
|
|
||||||
|
|
||||||
@with_context
|
def initialize(self, ctx):
|
||||||
def _to_json(ctx, row):
|
assert not hasattr(ctx, 'fp'), 'One at a time, baby.'
|
||||||
|
ctx.fp = open(self.path_or_buf, 'w+')
|
||||||
|
ctx.fp.write('[\n')
|
||||||
|
ctx.first = True
|
||||||
|
|
||||||
|
def __call__(self, ctx, row):
|
||||||
if ctx.first:
|
if ctx.first:
|
||||||
prefix = ''
|
prefix = ''
|
||||||
ctx.first = False
|
ctx.first = False
|
||||||
@ -20,19 +27,14 @@ def to_json(path_or_buf):
|
|||||||
prefix = ',\n'
|
prefix = ',\n'
|
||||||
ctx.fp.write(prefix + json.dumps(row))
|
ctx.fp.write(prefix + json.dumps(row))
|
||||||
|
|
||||||
@set_initializer(_to_json)
|
def finalize(self, ctx):
|
||||||
def _to_json_initialize(ctx):
|
|
||||||
assert not hasattr(ctx, 'fp'), 'One at a time, baby.'
|
|
||||||
ctx.fp = open(path_or_buf, 'w+')
|
|
||||||
ctx.fp.write('[\n')
|
|
||||||
ctx.first = True
|
|
||||||
|
|
||||||
@set_finalizer(_to_json)
|
|
||||||
def _to_json_finalize(ctx):
|
|
||||||
ctx.fp.write('\n]')
|
ctx.fp.write('\n]')
|
||||||
ctx.fp.close()
|
ctx.fp.close()
|
||||||
del ctx.fp, ctx.first
|
del ctx.fp, ctx.first
|
||||||
|
|
||||||
_to_json.__name__ = 'to_json'
|
|
||||||
|
|
||||||
return _to_json
|
def from_json(path_or_buf):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
to_json = JsonWriter
|
||||||
|
|||||||
@ -25,6 +25,10 @@ get_initializer, set_initializer = _create_lifecycle_functions('initializer', 'i
|
|||||||
get_finalizer, set_finalizer = _create_lifecycle_functions('finalizer', 'finalize')
|
get_finalizer, set_finalizer = _create_lifecycle_functions('finalizer', 'finalize')
|
||||||
|
|
||||||
|
|
||||||
def with_context(func):
|
class Contextual:
|
||||||
func._with_context = True
|
_with_context = True
|
||||||
return func
|
|
||||||
|
|
||||||
|
def with_context(cls_or_func):
|
||||||
|
cls_or_func._with_context = True
|
||||||
|
return cls_or_func
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
from bonobo import Graph, NaiveStrategy
|
from bonobo import Graph, NaiveStrategy, Bag
|
||||||
from bonobo.core.contexts import ExecutionContext
|
from bonobo.core.contexts import ExecutionContext
|
||||||
from bonobo.util.lifecycle import with_context
|
from bonobo.util.lifecycle import with_context
|
||||||
|
from bonobo.util.tokens import BEGIN, END
|
||||||
|
|
||||||
|
|
||||||
def generate_integers():
|
def generate_integers():
|
||||||
@ -28,7 +29,7 @@ def test_empty_execution_context():
|
|||||||
assert not len(ctx.components)
|
assert not len(ctx.components)
|
||||||
assert not len(ctx.plugins)
|
assert not len(ctx.plugins)
|
||||||
|
|
||||||
assert not ctx.running
|
assert not ctx.alive
|
||||||
|
|
||||||
|
|
||||||
def test_execution():
|
def test_execution():
|
||||||
@ -52,8 +53,8 @@ def test_simple_execution_context():
|
|||||||
for i, component in enumerate(chain):
|
for i, component in enumerate(chain):
|
||||||
assert ctx[i].component is component
|
assert ctx[i].component is component
|
||||||
|
|
||||||
assert not ctx.running
|
assert not ctx.alive
|
||||||
|
|
||||||
ctx.impulse()
|
ctx.recv(BEGIN, Bag(), END)
|
||||||
|
|
||||||
assert ctx.running
|
assert ctx.alive
|
||||||
|
|||||||
@ -1,21 +1,19 @@
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from bonobo import to_json
|
from bonobo import to_json, Bag
|
||||||
from bonobo.util.lifecycle import get_initializer, get_finalizer
|
from bonobo.core.contexts import ComponentExecutionContext
|
||||||
|
from bonobo.util.tokens import BEGIN, END
|
||||||
|
|
||||||
class ContextMock:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def test_write_json_to_file(tmpdir):
|
def test_write_json_to_file(tmpdir):
|
||||||
file = tmpdir.join('output.json')
|
file = tmpdir.join('output.json')
|
||||||
json_writer = to_json(str(file))
|
json_writer = to_json(str(file))
|
||||||
context = ContextMock()
|
context = ComponentExecutionContext(json_writer, None)
|
||||||
|
|
||||||
get_initializer(json_writer)(context)
|
context.initialize()
|
||||||
json_writer(context, {'foo': 'bar'})
|
context.recv(BEGIN, Bag({'foo': 'bar'}), END)
|
||||||
get_finalizer(json_writer)(context)
|
context.step()
|
||||||
|
context.finalize()
|
||||||
|
|
||||||
assert file.read() == '''[
|
assert file.read() == '''[
|
||||||
{"foo": "bar"}
|
{"foo": "bar"}
|
||||||
@ -32,6 +30,6 @@ def test_write_json_without_initializer_should_not_work(tmpdir):
|
|||||||
file = tmpdir.join('output.json')
|
file = tmpdir.join('output.json')
|
||||||
json_writer = to_json(str(file))
|
json_writer = to_json(str(file))
|
||||||
|
|
||||||
context = ContextMock()
|
context = ComponentExecutionContext(json_writer, None)
|
||||||
with pytest.raises(AttributeError):
|
with pytest.raises(AttributeError):
|
||||||
json_writer(context, {'foo': 'bar'})
|
json_writer(context, {'foo': 'bar'})
|
||||||
|
|||||||
Reference in New Issue
Block a user