implements bags, so we can pass arbitrary args/kwargs to functions.

This commit is contained in:
Romain Dorgueil
2016-12-25 12:40:28 +01:00
parent 9c4ec68b18
commit a3adb044bf
19 changed files with 151 additions and 120 deletions

View File

@ -1,17 +1,10 @@
import sys
from .core import Graph, NaiveStrategy, ProcessPoolExecutorStrategy, ThreadPoolExecutorStrategy, inject, service
from .core import *
from .io import *
from .util import *
PY35 = (sys.version_info >= (3, 5))
assert PY35, 'Python 3.5+ is required to use Bonobo.'
__all__ = [
Graph,
NaiveStrategy,
ProcessPoolExecutorStrategy,
ThreadPoolExecutorStrategy,
inject,
service,
]
__version__ = '0.0.0'

View File

@ -1,13 +1,16 @@
from .bags import Bag, Inherit
from .graphs import Graph
from .services import inject, service
from .strategies.executor import ThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy
from .strategies.naive import NaiveStrategy
__all__ = [
Graph,
NaiveStrategy,
ProcessPoolExecutorStrategy,
ThreadPoolExecutorStrategy,
inject,
service,
'Bag',
'Graph',
'Inherit',
'NaiveStrategy',
'ProcessPoolExecutorStrategy',
'ThreadPoolExecutorStrategy',
'inject',
'service',
]

19
bonobo/core/bags.py Normal file
View File

@ -0,0 +1,19 @@
class Bag:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def apply(self, f, *args, **kwargs):
return f(*args, *self.args, **kwargs, **self.kwargs)
def __repr__(self):
return '<{} *{} **{}>'.format(type(self).__name__, self.args, self.kwargs)
class Inherit(Bag):
def override(self, input):
self.args = input.args + self.args
kwargs = dict(input.kwargs)
kwargs.update(self.kwargs)
self.kwargs = kwargs
return self

View File

@ -3,11 +3,12 @@ from functools import partial
from queue import Empty
from time import sleep
from bonobo.core.bags import Bag
from bonobo.core.errors import InactiveReadableError
from bonobo.core.inputs import Input
from bonobo.core.stats import WithStatistics
from bonobo.util.lifecycle import get_initializer, get_finalizer
from bonobo.util.tokens import BEGIN, END, NEW, RUNNING, TERMINATED
from bonobo.util.tokens import Begin, End, New, Running, Terminated, NotModified
class ExecutionContext:
@ -22,8 +23,8 @@ class ExecutionContext:
component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)]
except KeyError as e:
continue
component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True)
component_context.input.on_end = partial(component_context.send, END, _control=True)
component_context.input.on_begin = partial(component_context.send, Begin, _control=True)
component_context.input.on_end = partial(component_context.send, End, _control=True)
def __getitem__(self, item):
return self.components[item]
@ -94,7 +95,7 @@ class ComponentExecutionContext(WithStatistics):
self.component = component
self.input = Input()
self.outputs = []
self.state = NEW
self.state = New
self.stats = {
'in': 0,
'out': 0,
@ -132,27 +133,33 @@ class ComponentExecutionContext(WithStatistics):
self.input.put(value)
def get(self):
row = self.input.get(timeout=1)
return row
# todo XXX if timeout, in stat is erroneous
self.stats['in'] += 1
return self.input.get(timeout=1)
def _call(self, row):
# timer = Timer()
# with timer:
args = () if row is None else (row, )
def _call(self, bag_or_arg):
# todo add timer
bag = bag_or_arg if hasattr(bag_or_arg, 'apply') else Bag(bag_or_arg)
if getattr(self.component, '_with_context', False):
return self.component(self, *args)
return self.component(*args)
return bag.apply(self.component, self)
return bag.apply(self.component)
def step(self):
# Pull data from the first available input channel.
"""Runs a transformation callable with given args/kwargs and flush the result into the right
output channel."""
row = self.get()
self.stats['in'] += 1
input_row = self.get()
results = self._call(row)
def _resolve(result):
nonlocal input_row
if result is NotModified:
return input_row
if hasattr(result, 'override'):
return result.override(input_row)
return result
results = self._call(input_row)
# self._exec_time += timer.duration
# Put data onto output channels
@ -160,7 +167,7 @@ class ComponentExecutionContext(WithStatistics):
results = iterable(results)
except TypeError:
if results:
self.send(results)
self.send(_resolve(results))
else:
# case with no result, an execution went through anyway, use for stats.
# self._exec_count += 1
@ -171,13 +178,13 @@ class ComponentExecutionContext(WithStatistics):
result = next(results)
except StopIteration as e:
break
self.send(result)
self.send(_resolve(result))
def run(self):
assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at the '
'beginning of a run().').format(type(self).__name__, NEW)
assert self.state is New, ('A {} can only be run once, and thus is expected to be in {} state at the '
'beginning of a run().').format(type(self).__name__, New)
self.state = RUNNING
self.state = Running
try:
get_initializer(self.component)(self)
except Exception as e:
@ -197,10 +204,10 @@ class ComponentExecutionContext(WithStatistics):
except Exception as e:
self.handle_error(e, traceback.format_exc())
assert self.state is RUNNING, ('A {} must be in {} state when finalization starts.').format(
type(self).__name__, RUNNING)
assert self.state is Running, ('A {} must be in {} state when finalization starts.').format(
type(self).__name__, Running)
self.state = TERMINATED
self.state = Terminated
try:
get_finalizer(self.component)(self)
except Exception as e:

View File

@ -1,4 +1,4 @@
from bonobo.util.tokens import BEGIN
from bonobo.util.tokens import Begin
class Graph:
@ -8,7 +8,7 @@ class Graph:
def __init__(self):
self.components = []
self.graph = {BEGIN: set()}
self.graph = {Begin: set()}
def outputs_of(self, idx, create=False):
if create and not idx in self.graph:
@ -20,7 +20,7 @@ class Graph:
self.components.append(c)
return i
def add_chain(self, *components, input=BEGIN):
def add_chain(self, *components, input=Begin):
for component in components:
next = self.add_component(component)
self.outputs_of(input, create=True).add(next)

View File

@ -19,7 +19,7 @@ from queue import Queue
from bonobo.core.errors import AbstractError, InactiveWritableError, InactiveReadableError
from bonobo.util import noop
from bonobo.util.tokens import BEGIN, END
from bonobo.util.tokens import Begin, End
BUFFER_SIZE = 8192
@ -53,7 +53,7 @@ class Input(Queue, Readable, Writable):
def put(self, data, block=True, timeout=None):
# Begin token is a metadata to raise the input runlevel.
if data == BEGIN:
if data == Begin:
self._runlevel += 1
self._writable_runlevel += 1
@ -66,7 +66,7 @@ class Input(Queue, Readable, Writable):
if self._writable_runlevel < 1:
raise InactiveWritableError('Cannot put() on an inactive {}.'.format(Writable.__name__))
if data == END:
if data == End:
self._writable_runlevel -= 1
return Queue.put(self, data, block, timeout)
@ -77,7 +77,7 @@ class Input(Queue, Readable, Writable):
data = Queue.get(self, block, timeout)
if data == END:
if data == End:
self._runlevel -= 1
# callback
@ -92,7 +92,7 @@ class Input(Queue, Readable, Writable):
def empty(self):
self.mutex.acquire()
while self._qsize() and self.queue[0] == END:
while self._qsize() and self.queue[0] == End:
self._runlevel -= 1
Queue._get(self)
self.mutex.release()

View File

@ -3,8 +3,9 @@ from concurrent.futures import Executor
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from bonobo.core.bags import Bag
from bonobo.core.strategies.base import Strategy
from bonobo.util.tokens import BEGIN, END
from bonobo.util.tokens import Begin, End
class ExecutorStrategy(Strategy):
@ -19,10 +20,10 @@ class ExecutorStrategy(Strategy):
context = self.create_context(graph, plugins=plugins)
executor = self.executor_factory()
for i in graph.outputs_of(BEGIN):
context[i].recv(BEGIN)
context[i].recv(None)
context[i].recv(END)
for i in graph.outputs_of(Begin):
context[i].recv(Begin)
context[i].recv(Bag())
context[i].recv(End)
futures = []

View File

@ -0,0 +1 @@
from .json import *

View File

@ -2,6 +2,8 @@ import json
from bonobo.util.lifecycle import with_context, set_initializer, set_finalizer
__all__ = ['to_json', ]
def to_json(path_or_buf):
# todo different cases + documentation

View File

@ -1,6 +1,16 @@
import functools
import pprint
from .tokens import NotModified
__all__ = [
'NotModified',
'head',
'log',
'noop',
'tee',
]
def head(n=10):
i = 0

View File

@ -8,8 +8,11 @@ class Token:
return '<{}>'.format(self.__name__)
BEGIN = Token('Begin')
END = Token('End')
NEW = Token('New')
RUNNING = Token('Running')
TERMINATED = Token('Terminated')
Begin = Token('Begin')
End = Token('End')
New = Token('New')
Running = Token('Running')
Terminated = Token('Terminated')
NotModified = Token('NotModified')