Merge pull request #8 from hartym/master

cq
This commit is contained in:
Romain Dorgueil
2016-12-26 14:17:56 +01:00
committed by GitHub
13 changed files with 88 additions and 77 deletions

View File

@ -1,7 +1,7 @@
# This file has been auto-generated. # This file has been auto-generated.
# All changes will be lost, see Projectfile. # All changes will be lost, see Projectfile.
# #
# Updated at 2016-12-26 11:52:20.154236 # Updated at 2016-12-26 14:09:09.609408
PYTHON ?= $(shell which python) PYTHON ?= $(shell which python)
PYTHON_BASENAME ?= $(shell basename $(PYTHON)) PYTHON_BASENAME ?= $(shell basename $(PYTHON))

View File

@ -38,7 +38,7 @@ __all__ = [
'Bag', 'Bag',
'Graph', 'Graph',
'NaiveStrategy', 'NaiveStrategy',
'NotModified', 'NOT_MODIFIED',
'ProcessPoolExecutorStrategy', 'ProcessPoolExecutorStrategy',
'ThreadPoolExecutorStrategy', 'ThreadPoolExecutorStrategy',
'head', 'head',

View File

@ -4,9 +4,7 @@ import itertools
from bonobo.util.tokens import Token from bonobo.util.tokens import Token
_get_args = attrgetter('args') INHERIT_INPUT = Token('InheritInput')
InheritInputFlag = Token('InheritInputFlag')
class Bag: class Bag:
@ -37,8 +35,8 @@ class Bag:
def flags(self): def flags(self):
return self._flags return self._flags
def apply(self, f, *args, **kwargs): def apply(self, func, *args, **kwargs):
return f(*args, *self.args, **kwargs, **self.kwargs) return func(*args, *self.args, **kwargs, **self.kwargs)
def extend(self, *args, **kwargs): def extend(self, *args, **kwargs):
return type(self)(*args, _parent=self, **kwargs) return type(self)(*args, _parent=self, **kwargs)
@ -48,7 +46,7 @@ class Bag:
@classmethod @classmethod
def inherit(cls, *args, **kwargs): def inherit(cls, *args, **kwargs):
return cls(*args, _flags=(InheritInputFlag, ), **kwargs) return cls(*args, _flags=(INHERIT_INPUT, ), **kwargs)
def __repr__(self): def __repr__(self):
return '<{} ({})>'.format( return '<{} ({})>'.format(

View File

@ -3,12 +3,12 @@ from functools import partial
from queue import Empty from queue import Empty
from time import sleep from time import sleep
from bonobo.core.bags import Bag, InheritInputFlag from bonobo.core.bags import Bag, INHERIT_INPUT
from bonobo.core.errors import InactiveReadableError from bonobo.core.errors import InactiveReadableError
from bonobo.core.inputs import Input from bonobo.core.inputs import Input
from bonobo.core.stats import WithStatistics from bonobo.core.stats import WithStatistics
from bonobo.util.lifecycle import get_initializer, get_finalizer from bonobo.util.lifecycle import get_initializer, get_finalizer
from bonobo.util.tokens import Begin, End, New, Running, Terminated, NotModified from bonobo.util.tokens import BEGIN, END, NEW, RUNNING, TERMINATED, NOT_MODIFIED
class ExecutionContext: class ExecutionContext:
@ -23,8 +23,8 @@ class ExecutionContext:
component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)] component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)]
except KeyError: except KeyError:
continue continue
component_context.input.on_begin = partial(component_context.send, Begin, _control=True) component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True)
component_context.input.on_end = partial(component_context.send, End, _control=True) component_context.input.on_end = partial(component_context.send, END, _control=True)
def __getitem__(self, item): def __getitem__(self, item):
return self.components[item] return self.components[item]
@ -36,10 +36,10 @@ class ExecutionContext:
yield from self.components yield from self.components
def impulse(self): def impulse(self):
for i in self.graph.outputs_of(Begin): for i in self.graph.outputs_of(BEGIN):
self[i].recv(Begin) self[i].recv(BEGIN)
self[i].recv(Bag()) self[i].recv(Bag())
self[i].recv(End) self[i].recv(END)
@property @property
def running(self): def running(self):
@ -52,46 +52,58 @@ class PluginExecutionContext:
self.plugin = plugin self.plugin = plugin
self.alive = True self.alive = True
def run(self): def initialize(self):
# pylint: disable=broad-except
try: try:
get_initializer(self.plugin)(self) get_initializer(self.plugin)(self)
except Exception as exc: except Exception as exc:
print('error in initializer', type(exc), exc) self.handle_error(exc, traceback.format_exc())
def finalize(self):
# pylint: disable=broad-except
try:
get_finalizer(self.plugin)(self)
except Exception as exc:
self.handle_error(exc, traceback.format_exc())
def run(self):
self.initialize()
while self.alive: while self.alive:
# todo with wrap_errors .... # todo with wrap_errors ....
try: try:
self.plugin.run(self) self.plugin.run(self)
except Exception as exc: except Exception as exc: # pylint: disable=broad-except
print('error', type(exc), exc) self.handle_error(exc, traceback.format_exc())
sleep(0.25) sleep(0.25)
try: self.finalize()
get_finalizer(self.plugin)(self)
except Exception as exc:
print('error in finalizer', type(exc), exc)
def shutdown(self): def shutdown(self):
self.alive = False self.alive = False
def handle_error(self, exc, trace):
print('\U0001F4A3 {} in plugin {}'.format(type(exc).__name__, self.plugin))
print(trace)
def _iter(x):
if isinstance(x, (dict, list, str)): def _iter(mixed):
raise TypeError(type(x).__name__) if isinstance(mixed, (dict, list, str)):
return iter(x) raise TypeError(type(mixed).__name__)
return iter(mixed)
def _resolve(input_bag, output): def _resolve(input_bag, output):
# NotModified means to send the input unmodified to output. # NotModified means to send the input unmodified to output.
if output is NotModified: if output is NOT_MODIFIED:
return input_bag return input_bag
# If it does not look like a bag, let's create one for easier manipulation # If it does not look like a bag, let's create one for easier manipulation
if hasattr(output, 'apply'): if hasattr(output, 'apply'):
# Already a bag? Check if we need to set parent. # Already a bag? Check if we need to set parent.
if InheritInputFlag in output.flags: if INHERIT_INPUT in output.flags:
output.set_parent(input_bag) output.set_parent(input_bag)
else: else:
# Not a bag? Let's encapsulate it. # Not a bag? Let's encapsulate it.
@ -118,7 +130,7 @@ class ComponentExecutionContext(WithStatistics):
self.component = component self.component = component
self.input = Input() self.input = Input()
self.outputs = [] self.outputs = []
self.state = New self.state = NEW
self.stats = { self.stats = {
'in': 0, 'in': 0,
'out': 0, 'out': 0,
@ -198,25 +210,26 @@ class ComponentExecutionContext(WithStatistics):
self.send(_resolve(input_bag, output)) self.send(_resolve(input_bag, output))
def initialize(self): def initialize(self):
assert self.state is New, ('A {} can only be run once, and thus is expected to be in {} state at ' # pylint: disable=broad-except
'initialization time.').format(type(self).__name__, New) assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at '
'initialization time.').format(type(self).__name__, NEW)
self.state = Running self.state = RUNNING
try: try:
get_initializer(self.component)(self) get_initializer(self.component)(self)
except Exception as e: except Exception as exc:
self.handle_error(e, traceback.format_exc()) self.handle_error(exc, traceback.format_exc())
def finalize(self): def finalize(self):
assert self.state is Running, ('A {} must be in {} state at finalization time.').format( # pylint: disable=broad-except
type(self).__name__, Running) assert self.state is RUNNING, ('A {} must be in {} state at finalization time.').format(
type(self).__name__, RUNNING)
self.state = TERMINATED
self.state = Terminated
try: try:
get_finalizer(self.component)(self) get_finalizer(self.component)(self)
except Exception as e: except Exception as exc:
self.handle_error(e, traceback.format_exc()) self.handle_error(exc, traceback.format_exc())
def run(self): def run(self):
self.initialize() self.initialize()
@ -230,14 +243,14 @@ class ComponentExecutionContext(WithStatistics):
sleep(1) sleep(1)
# Terminated, exit loop. # Terminated, exit loop.
break # BREAK !!! break # BREAK !!!
except Empty as e: except Empty:
continue continue
except Exception as e: except Exception as exc: # pylint: disable=broad-except
self.handle_error(e, traceback.format_exc()) self.handle_error(exc, traceback.format_exc())
self.finalize() self.finalize()
def handle_error(self, exc, tb): def handle_error(self, exc, trace):
self.stats['err'] += 1 self.stats['err'] += 1
print('\U0001F4A3 {} in {}'.format(type(exc).__name__, self.component)) print('\U0001F4A3 {} in {}'.format(type(exc).__name__, self.component))
print(tb) print(trace)

View File

@ -1,4 +1,4 @@
from bonobo.util.tokens import Begin from bonobo.util.tokens import BEGIN
class Graph: class Graph:
@ -8,7 +8,7 @@ class Graph:
def __init__(self): def __init__(self):
self.components = [] self.components = []
self.graph = {Begin: set()} self.graph = {BEGIN: set()}
def outputs_of(self, idx, create=False): def outputs_of(self, idx, create=False):
if create and not idx in self.graph: if create and not idx in self.graph:
@ -20,7 +20,7 @@ class Graph:
self.components.append(c) self.components.append(c)
return i return i
def add_chain(self, *components, _input=Begin): def add_chain(self, *components, _input=BEGIN):
for component in components: for component in components:
_next = self.add_component(component) _next = self.add_component(component)
self.outputs_of(_input, create=True).add(_next) self.outputs_of(_input, create=True).add(_next)

View File

@ -19,7 +19,7 @@ from queue import Queue
from bonobo.core.errors import AbstractError, InactiveWritableError, InactiveReadableError from bonobo.core.errors import AbstractError, InactiveWritableError, InactiveReadableError
from bonobo.util import noop from bonobo.util import noop
from bonobo.util.tokens import Begin, End from bonobo.util.tokens import BEGIN, END
BUFFER_SIZE = 8192 BUFFER_SIZE = 8192
@ -53,7 +53,7 @@ class Input(Queue, Readable, Writable):
def put(self, data, block=True, timeout=None): def put(self, data, block=True, timeout=None):
# Begin token is a metadata to raise the input runlevel. # Begin token is a metadata to raise the input runlevel.
if data == Begin: if data == BEGIN:
self._runlevel += 1 self._runlevel += 1
self._writable_runlevel += 1 self._writable_runlevel += 1
@ -66,7 +66,7 @@ class Input(Queue, Readable, Writable):
if self._writable_runlevel < 1: if self._writable_runlevel < 1:
raise InactiveWritableError('Cannot put() on an inactive {}.'.format(Writable.__name__)) raise InactiveWritableError('Cannot put() on an inactive {}.'.format(Writable.__name__))
if data == End: if data == END:
self._writable_runlevel -= 1 self._writable_runlevel -= 1
return Queue.put(self, data, block, timeout) return Queue.put(self, data, block, timeout)
@ -77,7 +77,7 @@ class Input(Queue, Readable, Writable):
data = Queue.get(self, block, timeout) data = Queue.get(self, block, timeout)
if data == End: if data == END:
self._runlevel -= 1 self._runlevel -= 1
# callback # callback
@ -92,7 +92,7 @@ class Input(Queue, Readable, Writable):
def empty(self): def empty(self):
self.mutex.acquire() self.mutex.acquire()
while self._qsize() and self.queue[0] == End: while self._qsize() and self.queue[0] == END:
self._runlevel -= 1 self._runlevel -= 1
Queue._get(self) Queue._get(self)
self.mutex.release() self.mutex.release()

View File

@ -3,10 +3,10 @@
import functools import functools
import pprint import pprint
from .tokens import NotModified from .tokens import NOT_MODIFIED
__all__ = [ __all__ = [
'NotModified', 'NOT_MODIFIED',
'head', 'head',
'log', 'log',
'noop', 'noop',
@ -40,5 +40,5 @@ def tee(f):
log = tee(pprint.pprint) log = tee(pprint.pprint)
def noop(*args, **kwargs): def noop(*args, **kwargs): # pylint: disable=unused-argument
pass pass

View File

@ -25,6 +25,6 @@ get_initializer, set_initializer = _create_lifecycle_functions('initializer', 'i
get_finalizer, set_finalizer = _create_lifecycle_functions('finalizer', 'finalize') get_finalizer, set_finalizer = _create_lifecycle_functions('finalizer', 'finalize')
def with_context(f): def with_context(func):
f._with_context = True func._with_context = True
return f return func

View File

@ -8,11 +8,11 @@ class Token:
return '<{}>'.format(self.__name__) return '<{}>'.format(self.__name__)
Begin = Token('Begin') BEGIN = Token('Begin')
End = Token('End') END = Token('End')
New = Token('New') NEW = Token('New')
Running = Token('Running') RUNNING = Token('Running')
Terminated = Token('Terminated') TERMINATED = Token('Terminated')
NotModified = Token('NotModified') NOT_MODIFIED = Token('NotModified')

View File

@ -14,7 +14,7 @@ def read(filename, flt=None):
try: try:
version = read('version.txt') version = read('version.txt')
except: except: # pylint: disable=bare-except
version = 'dev' version = 'dev'
setup( setup(

View File

@ -1,7 +1,7 @@
from mock import Mock from mock import Mock
from bonobo import Bag from bonobo import Bag
from bonobo.core.bags import InheritInputFlag from bonobo.core.bags import INHERIT_INPUT
args = ( args = (
'foo', 'foo',
@ -42,7 +42,7 @@ def test_inherit():
'a', 'a',
'b', ) 'b', )
assert bag2.kwargs == {'a': 1, 'b': 2} assert bag2.kwargs == {'a': 1, 'b': 2}
assert InheritInputFlag in bag2.flags assert INHERIT_INPUT in bag2.flags
assert bag3.args == ( assert bag3.args == (
'a', 'a',

View File

@ -1,7 +1,7 @@
import pytest import pytest
from bonobo.core.graphs import Graph from bonobo.core.graphs import Graph
from bonobo.util.tokens import Begin from bonobo.util.tokens import BEGIN
identity = lambda x: x identity = lambda x: x
@ -10,7 +10,7 @@ def test_graph_outputs_of():
g = Graph() g = Graph()
# default graph only node # default graph only node
assert len(g.outputs_of(Begin)) == 0 assert len(g.outputs_of(BEGIN)) == 0
# unexisting node # unexisting node
with pytest.raises(KeyError): with pytest.raises(KeyError):
@ -40,4 +40,4 @@ def test_graph_add_chain():
g.add_chain(identity, identity, identity) g.add_chain(identity, identity, identity)
assert len(g.components) == 3 assert len(g.components) == 3
assert len(g.outputs_of(Begin)) == 1 assert len(g.outputs_of(BEGIN)) == 1

View File

@ -20,7 +20,7 @@ import pytest
from bonobo.core.errors import InactiveWritableError, InactiveReadableError from bonobo.core.errors import InactiveWritableError, InactiveReadableError
from bonobo.core.inputs import Input from bonobo.core.inputs import Input
from bonobo.util.tokens import Begin, End from bonobo.util.tokens import BEGIN, END
def test_input_runlevels(): def test_input_runlevels():
@ -32,15 +32,15 @@ def test_input_runlevels():
q.put('hello, unborn queue.') q.put('hello, unborn queue.')
# Begin # Begin
q.put(Begin) q.put(BEGIN)
assert q.alive and q._runlevel == 1 assert q.alive and q._runlevel == 1
q.put('foo') q.put('foo')
# Second Begin # Second Begin
q.put(Begin) q.put(BEGIN)
assert q.alive and q._runlevel == 2 assert q.alive and q._runlevel == 2
q.put('bar') q.put('bar')
q.put(End) q.put(END)
# FIFO # FIFO
assert q.get() == 'foo' assert q.get() == 'foo'
@ -56,7 +56,7 @@ def test_input_runlevels():
q.put('baz') q.put('baz')
# Now kill the queue... # Now kill the queue...
q.put(End) q.put(END)
with pytest.raises(InactiveWritableError): with pytest.raises(InactiveWritableError):
q.put('foo') q.put('foo')