diff --git a/bonobo/__init__.py b/bonobo/__init__.py index 628a96c..0d331c6 100644 --- a/bonobo/__init__.py +++ b/bonobo/__init__.py @@ -38,7 +38,7 @@ __all__ = [ 'Bag', 'Graph', 'NaiveStrategy', - 'NotModified', + 'NOT_MODIFIED', 'ProcessPoolExecutorStrategy', 'ThreadPoolExecutorStrategy', 'head', diff --git a/bonobo/core/bags.py b/bonobo/core/bags.py index 8f86ab1..3ae71e8 100644 --- a/bonobo/core/bags.py +++ b/bonobo/core/bags.py @@ -6,7 +6,7 @@ from bonobo.util.tokens import Token _get_args = attrgetter('args') -InheritInputFlag = Token('InheritInputFlag') +INHERIT_INPUT = Token('InheritInput') class Bag: @@ -37,8 +37,8 @@ class Bag: def flags(self): return self._flags - def apply(self, f, *args, **kwargs): - return f(*args, *self.args, **kwargs, **self.kwargs) + def apply(self, func, *args, **kwargs): + return func(*args, *self.args, **kwargs, **self.kwargs) def extend(self, *args, **kwargs): return type(self)(*args, _parent=self, **kwargs) @@ -48,7 +48,7 @@ class Bag: @classmethod def inherit(cls, *args, **kwargs): - return cls(*args, _flags=(InheritInputFlag, ), **kwargs) + return cls(*args, _flags=(INHERIT_INPUT, ), **kwargs) def __repr__(self): return '<{} ({})>'.format( diff --git a/bonobo/core/contexts.py b/bonobo/core/contexts.py index a69ab51..d0935d4 100644 --- a/bonobo/core/contexts.py +++ b/bonobo/core/contexts.py @@ -3,12 +3,12 @@ from functools import partial from queue import Empty from time import sleep -from bonobo.core.bags import Bag, InheritInputFlag +from bonobo.core.bags import Bag, INHERIT_INPUT from bonobo.core.errors import InactiveReadableError from bonobo.core.inputs import Input from bonobo.core.stats import WithStatistics from bonobo.util.lifecycle import get_initializer, get_finalizer -from bonobo.util.tokens import Begin, End, New, Running, Terminated, NotModified +from bonobo.util.tokens import BEGIN, END, NEW, RUNNING, TERMINATED, NOT_MODIFIED class ExecutionContext: @@ -23,8 +23,8 @@ class ExecutionContext: component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)] except KeyError: continue - component_context.input.on_begin = partial(component_context.send, Begin, _control=True) - component_context.input.on_end = partial(component_context.send, End, _control=True) + component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True) + component_context.input.on_end = partial(component_context.send, END, _control=True) def __getitem__(self, item): return self.components[item] @@ -36,10 +36,10 @@ class ExecutionContext: yield from self.components def impulse(self): - for i in self.graph.outputs_of(Begin): - self[i].recv(Begin) + for i in self.graph.outputs_of(BEGIN): + self[i].recv(BEGIN) self[i].recv(Bag()) - self[i].recv(End) + self[i].recv(END) @property def running(self): @@ -85,13 +85,13 @@ def _iter(x): def _resolve(input_bag, output): # NotModified means to send the input unmodified to output. - if output is NotModified: + if output is NOT_MODIFIED: return input_bag # If it does not look like a bag, let's create one for easier manipulation if hasattr(output, 'apply'): # Already a bag? Check if we need to set parent. - if InheritInputFlag in output.flags: + if INHERIT_INPUT in output.flags: output.set_parent(input_bag) else: # Not a bag? Let's encapsulate it. @@ -118,7 +118,7 @@ class ComponentExecutionContext(WithStatistics): self.component = component self.input = Input() self.outputs = [] - self.state = New + self.state = NEW self.stats = { 'in': 0, 'out': 0, @@ -198,10 +198,10 @@ class ComponentExecutionContext(WithStatistics): self.send(_resolve(input_bag, output)) def initialize(self): - assert self.state is New, ('A {} can only be run once, and thus is expected to be in {} state at ' - 'initialization time.').format(type(self).__name__, New) + assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at ' + 'initialization time.').format(type(self).__name__, NEW) - self.state = Running + self.state = RUNNING try: get_initializer(self.component)(self) @@ -209,10 +209,10 @@ class ComponentExecutionContext(WithStatistics): self.handle_error(e, traceback.format_exc()) def finalize(self): - assert self.state is Running, ('A {} must be in {} state at finalization time.').format( - type(self).__name__, Running) + assert self.state is RUNNING, ('A {} must be in {} state at finalization time.').format( + type(self).__name__, RUNNING) - self.state = Terminated + self.state = TERMINATED try: get_finalizer(self.component)(self) except Exception as e: diff --git a/bonobo/core/graphs.py b/bonobo/core/graphs.py index df79777..e9ffeb5 100644 --- a/bonobo/core/graphs.py +++ b/bonobo/core/graphs.py @@ -1,4 +1,4 @@ -from bonobo.util.tokens import Begin +from bonobo.util.tokens import BEGIN class Graph: @@ -8,7 +8,7 @@ class Graph: def __init__(self): self.components = [] - self.graph = {Begin: set()} + self.graph = {BEGIN: set()} def outputs_of(self, idx, create=False): if create and not idx in self.graph: @@ -20,7 +20,7 @@ class Graph: self.components.append(c) return i - def add_chain(self, *components, _input=Begin): + def add_chain(self, *components, _input=BEGIN): for component in components: _next = self.add_component(component) self.outputs_of(_input, create=True).add(_next) diff --git a/bonobo/core/inputs.py b/bonobo/core/inputs.py index 6b01c2c..1680e18 100644 --- a/bonobo/core/inputs.py +++ b/bonobo/core/inputs.py @@ -19,7 +19,7 @@ from queue import Queue from bonobo.core.errors import AbstractError, InactiveWritableError, InactiveReadableError from bonobo.util import noop -from bonobo.util.tokens import Begin, End +from bonobo.util.tokens import BEGIN, END BUFFER_SIZE = 8192 @@ -53,7 +53,7 @@ class Input(Queue, Readable, Writable): def put(self, data, block=True, timeout=None): # Begin token is a metadata to raise the input runlevel. - if data == Begin: + if data == BEGIN: self._runlevel += 1 self._writable_runlevel += 1 @@ -66,7 +66,7 @@ class Input(Queue, Readable, Writable): if self._writable_runlevel < 1: raise InactiveWritableError('Cannot put() on an inactive {}.'.format(Writable.__name__)) - if data == End: + if data == END: self._writable_runlevel -= 1 return Queue.put(self, data, block, timeout) @@ -77,7 +77,7 @@ class Input(Queue, Readable, Writable): data = Queue.get(self, block, timeout) - if data == End: + if data == END: self._runlevel -= 1 # callback @@ -92,7 +92,7 @@ class Input(Queue, Readable, Writable): def empty(self): self.mutex.acquire() - while self._qsize() and self.queue[0] == End: + while self._qsize() and self.queue[0] == END: self._runlevel -= 1 Queue._get(self) self.mutex.release() diff --git a/bonobo/util/__init__.py b/bonobo/util/__init__.py index 38e1f39..d75720c 100644 --- a/bonobo/util/__init__.py +++ b/bonobo/util/__init__.py @@ -3,10 +3,10 @@ import functools import pprint -from .tokens import NotModified +from .tokens import NOT_MODIFIED __all__ = [ - 'NotModified', + 'NOT_MODIFIED', 'head', 'log', 'noop', diff --git a/bonobo/util/tokens.py b/bonobo/util/tokens.py index e8a9164..98c0fae 100644 --- a/bonobo/util/tokens.py +++ b/bonobo/util/tokens.py @@ -8,11 +8,11 @@ class Token: return '<{}>'.format(self.__name__) -Begin = Token('Begin') -End = Token('End') +BEGIN = Token('Begin') +END = Token('End') -New = Token('New') -Running = Token('Running') -Terminated = Token('Terminated') +NEW = Token('New') +RUNNING = Token('Running') +TERMINATED = Token('Terminated') -NotModified = Token('NotModified') +NOT_MODIFIED = Token('NotModified') diff --git a/tests/core/test_bags.py b/tests/core/test_bags.py index 87bc2b0..5998381 100644 --- a/tests/core/test_bags.py +++ b/tests/core/test_bags.py @@ -1,7 +1,7 @@ from mock import Mock from bonobo import Bag -from bonobo.core.bags import InheritInputFlag +from bonobo.core.bags import INHERIT_INPUT args = ( 'foo', @@ -42,7 +42,7 @@ def test_inherit(): 'a', 'b', ) assert bag2.kwargs == {'a': 1, 'b': 2} - assert InheritInputFlag in bag2.flags + assert INHERIT_INPUT in bag2.flags assert bag3.args == ( 'a', diff --git a/tests/core/test_graphs.py b/tests/core/test_graphs.py index e9f67c7..47a7dbd 100644 --- a/tests/core/test_graphs.py +++ b/tests/core/test_graphs.py @@ -1,7 +1,7 @@ import pytest from bonobo.core.graphs import Graph -from bonobo.util.tokens import Begin +from bonobo.util.tokens import BEGIN identity = lambda x: x @@ -10,7 +10,7 @@ def test_graph_outputs_of(): g = Graph() # default graph only node - assert len(g.outputs_of(Begin)) == 0 + assert len(g.outputs_of(BEGIN)) == 0 # unexisting node with pytest.raises(KeyError): @@ -40,4 +40,4 @@ def test_graph_add_chain(): g.add_chain(identity, identity, identity) assert len(g.components) == 3 - assert len(g.outputs_of(Begin)) == 1 + assert len(g.outputs_of(BEGIN)) == 1 diff --git a/tests/core/test_inputs.py b/tests/core/test_inputs.py index 4a591d3..6b421e3 100644 --- a/tests/core/test_inputs.py +++ b/tests/core/test_inputs.py @@ -20,7 +20,7 @@ import pytest from bonobo.core.errors import InactiveWritableError, InactiveReadableError from bonobo.core.inputs import Input -from bonobo.util.tokens import Begin, End +from bonobo.util.tokens import BEGIN, END def test_input_runlevels(): @@ -32,15 +32,15 @@ def test_input_runlevels(): q.put('hello, unborn queue.') # Begin - q.put(Begin) + q.put(BEGIN) assert q.alive and q._runlevel == 1 q.put('foo') # Second Begin - q.put(Begin) + q.put(BEGIN) assert q.alive and q._runlevel == 2 q.put('bar') - q.put(End) + q.put(END) # FIFO assert q.get() == 'foo' @@ -56,7 +56,7 @@ def test_input_runlevels(): q.put('baz') # Now kill the queue... - q.put(End) + q.put(END) with pytest.raises(InactiveWritableError): q.put('foo')