diff --git a/bonobo/core/contexts.py b/bonobo/core/contexts.py index 3a35241..7b0ec26 100644 --- a/bonobo/core/contexts.py +++ b/bonobo/core/contexts.py @@ -35,95 +35,100 @@ class ExecutionContext: def __iter__(self): yield from self.components - def impulse(self): + def recv(self, *messages): + """Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in + our graph.""" + for i in self.graph.outputs_of(BEGIN): - self[i].recv(BEGIN) - self[i].recv(Bag()) - self[i].recv(END) + for message in messages: + self[i].recv(message) @property - def running(self): - return any(component.running for component in self.components) + def alive(self): + return any(component.alive for component in self.components) -class PluginExecutionContext: - def __init__(self, plugin, parent): - self.parent = parent - self.plugin = plugin - self.alive = True +class AbstractLoopContext: + alive = True + PERIOD = 0.25 + + def __init__(self, wrapped): + self.wrapped = wrapped + + def run(self): + self.initialize() + self.loop() + self.finalize() def initialize(self): # pylint: disable=broad-except try: - get_initializer(self.plugin)(self) + get_initializer(self.wrapped)(self) except Exception as exc: self.handle_error(exc, traceback.format_exc()) + def loop(self): + """Generic loop. A bit boring. """ + while self.alive: + self._loop() + sleep(self.PERIOD) + + def _loop(self): + """ + TODO xxx this is a step, not a loop + """ + raise NotImplementedError('Abstract.') + def finalize(self): + """Generic finalizer. """ # pylint: disable=broad-except try: - get_finalizer(self.plugin)(self) + get_finalizer(self.wrapped)(self) except Exception as exc: self.handle_error(exc, traceback.format_exc()) - def run(self): - self.initialize() + def handle_error(self, exc, trace): + """ + Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception + or somehow make me think it is an exception, I'll handle it. - while self.alive: - # todo with wrap_errors .... + :param exc: the culprit + :param trace: Hercule Poirot's logbook. + :return: to hell + """ + print('\U0001F4A3 {} in {}'.format(type(exc).__name__, self.wrapped)) + print(trace) - try: - self.plugin.run(self) - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - sleep(0.25) - - self.finalize() +class PluginExecutionContext(AbstractLoopContext): + def __init__(self, plugin, parent): + self.plugin = plugin + self.parent = parent + super().__init__(self.plugin) def shutdown(self): self.alive = False - def handle_error(self, exc, trace): - print('\U0001F4A3 {} in plugin {}'.format(type(exc).__name__, self.plugin)) - print(trace) + def _loop(self): + try: + self.wrapped.run(self) + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) -def _iter(mixed): - if isinstance(mixed, (dict, list, str)): - raise TypeError(type(mixed).__name__) - return iter(mixed) - - -def _resolve(input_bag, output): - # NotModified means to send the input unmodified to output. - if output is NOT_MODIFIED: - return input_bag - - # If it does not look like a bag, let's create one for easier manipulation - if hasattr(output, 'apply'): - # Already a bag? Check if we need to set parent. - if INHERIT_INPUT in output.flags: - output.set_parent(input_bag) - else: - # Not a bag? Let's encapsulate it. - output = Bag(output) - - return output - - -class ComponentExecutionContext(WithStatistics): +class ComponentExecutionContext(WithStatistics, AbstractLoopContext): """ todo: make the counter dependant of parent context? """ @property - def name(self): - return self.component.__name__ + def alive(self): + """todo check if this is right, and where it is used""" + return self.input.alive @property - def running(self): - return self.input.alive + def name(self): + return self.component.__name__ def __init__(self, component, parent): self.parent = parent @@ -139,9 +144,11 @@ class ComponentExecutionContext(WithStatistics): 'write': 0, } + super().__init__(self.component) + def __repr__(self): """Adds "alive" information to the transform representation.""" - return ('+' if self.running else '-') + ' ' + self.name + ' ' + self.get_stats_as_string() + return ('+' if self.alive else '-') + ' ' + self.name + ' ' + self.get_stats_as_string() def get_stats(self, *args, **kwargs): return ( @@ -155,24 +162,33 @@ class ComponentExecutionContext(WithStatistics): 'err', self.stats['err'], ), ) - def impulse(self): - self.input.put(None) + def recv(self, *messages): + """ + Push a message list to this context's input queue. + + :param mixed value: message + """ + for message in messages: + self.input.put(message) def send(self, value, _control=False): + """ + Sends a message to all of this context's outputs. + + :param mixed value: message + :param _control: if true, won't count in statistics. + """ if not _control: self.stats['out'] += 1 for output in self.outputs: output.put(value) - def recv(self, value): - self.input.put(value) - def get(self): """ Get from the queue first, then increment stats, so if Queue raise Timeout or Empty, stat won't be changed. """ - row = self.input.get(timeout=1) + row = self.input.get(timeout=self.PERIOD) self.stats['in'] += 1 return row @@ -182,6 +198,27 @@ class ComponentExecutionContext(WithStatistics): return bag.apply(self.component, self) return bag.apply(self.component) + def initialize(self): + # pylint: disable=broad-except + assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at ' + 'initialization time.').format(type(self).__name__, NEW) + self.state = RUNNING + super().initialize() + + def loop(self): + while True: + try: + self.step() + except KeyboardInterrupt: + raise + except InactiveReadableError: + break + except Empty: + sleep(self.PERIOD) + continue + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + def step(self): # Pull data from the first available input channel. """Runs a transformation callable with given args/kwargs and flush the result into the right @@ -209,48 +246,33 @@ class ComponentExecutionContext(WithStatistics): break self.send(_resolve(input_bag, output)) - def initialize(self): - # pylint: disable=broad-except - assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at ' - 'initialization time.').format(type(self).__name__, NEW) - self.state = RUNNING - - try: - get_initializer(self.component)(self) - except Exception as exc: - self.handle_error(exc, traceback.format_exc()) - def finalize(self): # pylint: disable=broad-except assert self.state is RUNNING, ('A {} must be in {} state at finalization time.').format( type(self).__name__, RUNNING) self.state = TERMINATED - try: - get_finalizer(self.component)(self) - except Exception as exc: - self.handle_error(exc, traceback.format_exc()) + super().finalize() - def run(self): - self.initialize() - while True: - try: - self.step() - except KeyboardInterrupt: - raise - except InactiveReadableError: - sleep(1) - # Terminated, exit loop. - break # BREAK !!! - except Empty: - continue - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) +def _iter(mixed): + if isinstance(mixed, (dict, list, str)): + raise TypeError(type(mixed).__name__) + return iter(mixed) - self.finalize() - def handle_error(self, exc, trace): - self.stats['err'] += 1 - print('\U0001F4A3 {} in {}'.format(type(exc).__name__, self.component)) - print(trace) +def _resolve(input_bag, output): + # NotModified means to send the input unmodified to output. + if output is NOT_MODIFIED: + return input_bag + + # If it does not look like a bag, let's create one for easier manipulation + if hasattr(output, 'apply'): + # Already a bag? Check if we need to set parent. + if INHERIT_INPUT in output.flags: + output.set_parent(input_bag) + else: + # Not a bag? Let's encapsulate it. + output = Bag(output) + + return output diff --git a/bonobo/core/strategies/executor.py b/bonobo/core/strategies/executor.py index 47e9139..038522f 100644 --- a/bonobo/core/strategies/executor.py +++ b/bonobo/core/strategies/executor.py @@ -4,6 +4,9 @@ from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor from bonobo.core.strategies.base import Strategy +from bonobo.util.tokens import BEGIN, END + +from ..bags import Bag class ExecutorStrategy(Strategy): @@ -16,7 +19,7 @@ class ExecutorStrategy(Strategy): def execute(self, graph, *args, plugins=None, **kwargs): context = self.create_context(graph, plugins=plugins) - context.impulse() + context.recv(BEGIN, Bag(), END) executor = self.executor_factory() @@ -28,7 +31,7 @@ class ExecutorStrategy(Strategy): for component_context in context.components: futures.append(executor.submit(component_context.run)) - while context.running: + while context.alive: time.sleep(0.2) for plugin_context in context.plugins: diff --git a/bonobo/core/strategies/naive.py b/bonobo/core/strategies/naive.py index fe78271..e39d58a 100644 --- a/bonobo/core/strategies/naive.py +++ b/bonobo/core/strategies/naive.py @@ -1,10 +1,13 @@ from bonobo.core.strategies.base import Strategy +from bonobo.util.tokens import BEGIN, END + +from ..bags import Bag class NaiveStrategy(Strategy): def execute(self, graph, *args, plugins=None, **kwargs): context = self.create_context(graph, plugins=plugins) - context.impulse() + context.recv(BEGIN, Bag(), END) # TODO: how to run plugins in "naive" mode ? diff --git a/bonobo/ext/console/plugin.py b/bonobo/ext/console/plugin.py index 5ef6469..4c5e8fb 100644 --- a/bonobo/ext/console/plugin.py +++ b/bonobo/ext/console/plugin.py @@ -76,7 +76,7 @@ class ConsoleOutputPlugin(Plugin): t_cnt = len(context) for i, component in enumerate(context): - if component.running: + if component.alive: _line = ''.join(( t.black('({})'.format(i + 1)), ' ', diff --git a/bonobo/io/json.py b/bonobo/io/json.py index 32e79f2..2df1538 100644 --- a/bonobo/io/json.py +++ b/bonobo/io/json.py @@ -1,18 +1,25 @@ import json -from bonobo.util.lifecycle import with_context, set_initializer, set_finalizer +from bonobo.util.lifecycle import with_context -__all__ = ['to_json', ] +__all__ = [ + 'from_json', + 'to_json', +] -def to_json(path_or_buf): - # todo different cases + documentation - # case 1: path_or_buf is str, we consider it filename, open and write - # case 2: pob is None, json should be yielded - # case 3: pob is stream, filelike, write, gogog. +@with_context +class JsonWriter: + def __init__(self, path_or_buf): + self.path_or_buf = path_or_buf - @with_context - def _to_json(ctx, row): + def initialize(self, ctx): + assert not hasattr(ctx, 'fp'), 'One at a time, baby.' + ctx.fp = open(self.path_or_buf, 'w+') + ctx.fp.write('[\n') + ctx.first = True + + def __call__(self, ctx, row): if ctx.first: prefix = '' ctx.first = False @@ -20,19 +27,14 @@ def to_json(path_or_buf): prefix = ',\n' ctx.fp.write(prefix + json.dumps(row)) - @set_initializer(_to_json) - def _to_json_initialize(ctx): - assert not hasattr(ctx, 'fp'), 'One at a time, baby.' - ctx.fp = open(path_or_buf, 'w+') - ctx.fp.write('[\n') - ctx.first = True - - @set_finalizer(_to_json) - def _to_json_finalize(ctx): + def finalize(self, ctx): ctx.fp.write('\n]') ctx.fp.close() del ctx.fp, ctx.first - _to_json.__name__ = 'to_json' - return _to_json +def from_json(path_or_buf): + pass + + +to_json = JsonWriter diff --git a/bonobo/util/lifecycle.py b/bonobo/util/lifecycle.py index 0080149..16acf68 100644 --- a/bonobo/util/lifecycle.py +++ b/bonobo/util/lifecycle.py @@ -25,6 +25,10 @@ get_initializer, set_initializer = _create_lifecycle_functions('initializer', 'i get_finalizer, set_finalizer = _create_lifecycle_functions('finalizer', 'finalize') -def with_context(func): - func._with_context = True - return func +class Contextual: + _with_context = True + + +def with_context(cls_or_func): + cls_or_func._with_context = True + return cls_or_func diff --git a/tests/core/test_contexts.py b/tests/core/test_contexts.py index 30ee083..aac6c56 100644 --- a/tests/core/test_contexts.py +++ b/tests/core/test_contexts.py @@ -1,6 +1,7 @@ -from bonobo import Graph, NaiveStrategy +from bonobo import Graph, NaiveStrategy, Bag from bonobo.core.contexts import ExecutionContext from bonobo.util.lifecycle import with_context +from bonobo.util.tokens import BEGIN, END def generate_integers(): @@ -28,7 +29,7 @@ def test_empty_execution_context(): assert not len(ctx.components) assert not len(ctx.plugins) - assert not ctx.running + assert not ctx.alive def test_execution(): @@ -52,8 +53,8 @@ def test_simple_execution_context(): for i, component in enumerate(chain): assert ctx[i].component is component - assert not ctx.running + assert not ctx.alive - ctx.impulse() + ctx.recv(BEGIN, Bag(), END) - assert ctx.running + assert ctx.alive diff --git a/tests/io/test_json.py b/tests/io/test_json.py index 0e3a2f9..af7f2f5 100644 --- a/tests/io/test_json.py +++ b/tests/io/test_json.py @@ -1,21 +1,19 @@ import pytest -from bonobo import to_json -from bonobo.util.lifecycle import get_initializer, get_finalizer - - -class ContextMock: - pass +from bonobo import to_json, Bag +from bonobo.core.contexts import ComponentExecutionContext +from bonobo.util.tokens import BEGIN, END def test_write_json_to_file(tmpdir): file = tmpdir.join('output.json') json_writer = to_json(str(file)) - context = ContextMock() + context = ComponentExecutionContext(json_writer, None) - get_initializer(json_writer)(context) - json_writer(context, {'foo': 'bar'}) - get_finalizer(json_writer)(context) + context.initialize() + context.recv(BEGIN, Bag({'foo': 'bar'}), END) + context.step() + context.finalize() assert file.read() == '''[ {"foo": "bar"} @@ -32,6 +30,6 @@ def test_write_json_without_initializer_should_not_work(tmpdir): file = tmpdir.join('output.json') json_writer = to_json(str(file)) - context = ContextMock() + context = ComponentExecutionContext(json_writer, None) with pytest.raises(AttributeError): json_writer(context, {'foo': 'bar'})