testing bags
This commit is contained in:
@ -1,4 +1,4 @@
|
||||
from .bags import Bag, Inherit
|
||||
from .bags import Bag
|
||||
from .graphs import Graph
|
||||
from .services import inject, service
|
||||
from .strategies.executor import ThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy
|
||||
@ -7,7 +7,6 @@ from .strategies.naive import NaiveStrategy
|
||||
__all__ = [
|
||||
'Bag',
|
||||
'Graph',
|
||||
'Inherit',
|
||||
'NaiveStrategy',
|
||||
'ProcessPoolExecutorStrategy',
|
||||
'ThreadPoolExecutorStrategy',
|
||||
|
||||
@ -1,19 +1,58 @@
|
||||
from operator import attrgetter
|
||||
|
||||
import itertools
|
||||
|
||||
from bonobo.util.tokens import Token
|
||||
|
||||
_get_args = attrgetter('args')
|
||||
|
||||
InheritInputFlag = Token('InheritInputFlag')
|
||||
|
||||
|
||||
class Bag:
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
def __init__(self, *args, _flags=None, _parent=None, **kwargs):
|
||||
self._flags = _flags or ()
|
||||
self._parent = _parent
|
||||
self._args = args
|
||||
self._kwargs = kwargs
|
||||
|
||||
@property
|
||||
def args(self):
|
||||
if self._parent is None:
|
||||
return self._args
|
||||
return (
|
||||
*self._parent.args,
|
||||
*self._args, )
|
||||
|
||||
@property
|
||||
def kwargs(self):
|
||||
if self._parent is None:
|
||||
return self._kwargs
|
||||
return {
|
||||
** self._parent.kwargs,
|
||||
** self._kwargs,
|
||||
}
|
||||
|
||||
@property
|
||||
def flags(self):
|
||||
return self._flags
|
||||
|
||||
def apply(self, f, *args, **kwargs):
|
||||
return f(*args, *self.args, **kwargs, **self.kwargs)
|
||||
|
||||
def extend(self, *args, **kwargs):
|
||||
return type(self)(*args, _parent=self, **kwargs)
|
||||
|
||||
def set_parent(self, parent):
|
||||
self._parent = parent
|
||||
|
||||
@classmethod
|
||||
def inherit(cls, *args, **kwargs):
|
||||
return cls(*args, _flags=(InheritInputFlag, ), **kwargs)
|
||||
|
||||
def __repr__(self):
|
||||
return '<{} *{} **{}>'.format(type(self).__name__, self.args, self.kwargs)
|
||||
|
||||
|
||||
class Inherit(Bag):
|
||||
def override(self, input):
|
||||
self.args = input.args + self.args
|
||||
kwargs = dict(input.kwargs)
|
||||
kwargs.update(self.kwargs)
|
||||
self.kwargs = kwargs
|
||||
return self
|
||||
return '<{} ({})>'.format(
|
||||
type(self).__name__, ', '.join(
|
||||
itertools.chain(
|
||||
map(repr, self.args),
|
||||
('{}={}'.format(k, repr(v)) for k, v in self.kwargs.items()), )))
|
||||
|
||||
@ -3,7 +3,7 @@ from functools import partial
|
||||
from queue import Empty
|
||||
from time import sleep
|
||||
|
||||
from bonobo.core.bags import Bag
|
||||
from bonobo.core.bags import Bag, InheritInputFlag
|
||||
from bonobo.core.errors import InactiveReadableError
|
||||
from bonobo.core.inputs import Input
|
||||
from bonobo.core.stats import WithStatistics
|
||||
@ -139,7 +139,6 @@ class ComponentExecutionContext(WithStatistics):
|
||||
|
||||
def _call(self, bag_or_arg):
|
||||
# todo add timer
|
||||
bag = bag_or_arg if hasattr(bag_or_arg, 'apply') else Bag(bag_or_arg)
|
||||
if getattr(self.component, '_with_context', False):
|
||||
return bag.apply(self.component, self)
|
||||
return bag.apply(self.component)
|
||||
@ -149,17 +148,27 @@ class ComponentExecutionContext(WithStatistics):
|
||||
"""Runs a transformation callable with given args/kwargs and flush the result into the right
|
||||
output channel."""
|
||||
|
||||
input_row = self.get()
|
||||
input_bag = self.get()
|
||||
|
||||
def _resolve(result):
|
||||
nonlocal input_row
|
||||
if result is NotModified:
|
||||
return input_row
|
||||
if hasattr(result, 'override'):
|
||||
return result.override(input_row)
|
||||
return result
|
||||
def _resolve(output):
|
||||
nonlocal input_bag
|
||||
|
||||
results = self._call(input_row)
|
||||
# NotModified means to send the input unmodified to output.
|
||||
if output is NotModified:
|
||||
return input_bag
|
||||
|
||||
# If it does not look like a bag, let's create one for easier manipulation
|
||||
if hasattr(output, 'apply'):
|
||||
# Already a bag? Check if we need to set parent.
|
||||
if InheritInputFlag in output.flags:
|
||||
output.set_parent(input_bag)
|
||||
else:
|
||||
# Not a bag? Let's encapsulate it.
|
||||
output = Bag(result)
|
||||
|
||||
return output
|
||||
|
||||
results = self._call(input_bag)
|
||||
|
||||
# self._exec_time += timer.duration
|
||||
# Put data onto output channels
|
||||
|
||||
Reference in New Issue
Block a user