Starting to work on #47 a.k.a the context processor mess. This is a first implementation removing all the uggly function calls, but further work must work on less things called "context", as it is bad for readability. Concerns better separated now.
This commit is contained in:
@ -3,7 +3,7 @@ from pprint import pprint as _pprint
|
||||
|
||||
from colorama import Fore, Style
|
||||
|
||||
from bonobo.config.processors import contextual
|
||||
from bonobo.config.processors import ContextProcessor
|
||||
from bonobo.structs.bags import Bag
|
||||
from bonobo.util.objects import ValueHolder
|
||||
from bonobo.util.term import CLEAR_EOL
|
||||
@ -49,12 +49,11 @@ def Tee(f):
|
||||
return wrapped
|
||||
|
||||
|
||||
@contextual
|
||||
def count(counter, *args, **kwargs):
|
||||
counter += 1
|
||||
|
||||
|
||||
@count.add_context_processor
|
||||
@ContextProcessor.decorate(count)
|
||||
def _count_counter(self, context):
|
||||
counter = ValueHolder(0)
|
||||
yield counter
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
from bonobo.config.processors import ContextProcessor
|
||||
from bonobo.config.options import Option
|
||||
|
||||
__all__ = [
|
||||
@ -15,10 +16,14 @@ class ConfigurableMeta(type):
|
||||
super().__init__(what, bases, dict)
|
||||
cls.__options__ = {}
|
||||
cls.__positional_options__ = []
|
||||
cls.__processors__ = []
|
||||
|
||||
for typ in cls.__mro__:
|
||||
for name, value in typ.__dict__.items():
|
||||
if isinstance(value, Option):
|
||||
if isinstance(value, ContextProcessor):
|
||||
cls.__processors__.append(value)
|
||||
else:
|
||||
if not value.name:
|
||||
value.name = name
|
||||
if not name in cls.__options__:
|
||||
@ -26,6 +31,10 @@ class ConfigurableMeta(type):
|
||||
if value.positional:
|
||||
cls.__positional_options__.append(name)
|
||||
|
||||
# This can be done before, more efficiently. Not so bad neither as this is only done at type() creation time
|
||||
# (aka class Xxx(...) time) and there should not be hundreds of processors. Still not very elegant.
|
||||
cls.__processors__ = sorted(cls.__processors__, key=lambda v: v._creation_counter)
|
||||
|
||||
|
||||
class Configurable(metaclass=ConfigurableMeta):
|
||||
"""
|
||||
|
||||
@ -2,24 +2,23 @@ import functools
|
||||
|
||||
import types
|
||||
|
||||
from bonobo.util.compat import deprecated_alias
|
||||
from bonobo.util.compat import deprecated_alias, deprecated
|
||||
|
||||
from bonobo.config.options import Option
|
||||
from bonobo.util.iterators import ensure_tuple
|
||||
|
||||
_CONTEXT_PROCESSORS_ATTR = '__processors__'
|
||||
|
||||
|
||||
class ContextProcessor:
|
||||
_creation_counter = 0
|
||||
|
||||
class ContextProcessor(Option):
|
||||
@property
|
||||
def __name__(self):
|
||||
return self.func.__name__
|
||||
|
||||
def __init__(self, func):
|
||||
self.func = func
|
||||
|
||||
# This hack is necessary for python3.5
|
||||
self._creation_counter = ContextProcessor._creation_counter
|
||||
ContextProcessor._creation_counter += 1
|
||||
super(ContextProcessor, self).__init__(required=False, default=self.__name__)
|
||||
self.name = self.__name__
|
||||
|
||||
def __repr__(self):
|
||||
return repr(self.func).replace('<function', '<{}'.format(type(self).__name__))
|
||||
@ -27,11 +26,63 @@ class ContextProcessor:
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.func(*args, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def decorate(cls, cls_or_func):
|
||||
try:
|
||||
cls_or_func.__processors__
|
||||
except AttributeError:
|
||||
cls_or_func.__processors__ = []
|
||||
|
||||
def decorator(processor, cls_or_func=cls_or_func):
|
||||
cls_or_func.__processors__.append(cls(processor))
|
||||
return cls_or_func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
class ContextCurrifier:
|
||||
"""
|
||||
This is a helper to resolve processors.
|
||||
"""
|
||||
|
||||
def __init__(self, wrapped, *initial_context):
|
||||
self.wrapped = wrapped
|
||||
self.context = tuple(initial_context)
|
||||
self._stack = []
|
||||
|
||||
def setup(self, *context):
|
||||
if len(self._stack):
|
||||
raise RuntimeError('Cannot setup context currification twice.')
|
||||
for processor in resolve_processors(self.wrapped):
|
||||
_processed = processor(self.wrapped, *context, *self.context)
|
||||
_append_to_context = next(_processed)
|
||||
if _append_to_context is not None:
|
||||
self.context += ensure_tuple(_append_to_context)
|
||||
self._stack.append(_processed)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.wrapped(*self.context, *args, **kwargs)
|
||||
|
||||
def teardown(self):
|
||||
while len(self._stack):
|
||||
processor = self._stack.pop()
|
||||
try:
|
||||
# todo yield from ? how to ?
|
||||
next(processor)
|
||||
except StopIteration as exc:
|
||||
# This is normal, and wanted.
|
||||
pass
|
||||
else:
|
||||
# No error ? We should have had StopIteration ...
|
||||
raise RuntimeError('Context processors should not yield more than once.')
|
||||
|
||||
|
||||
@deprecated
|
||||
def add_context_processor(cls_or_func, context_processor):
|
||||
getattr(cls_or_func, _CONTEXT_PROCESSORS_ATTR).append(context_processor)
|
||||
|
||||
|
||||
@deprecated
|
||||
def contextual(cls_or_func):
|
||||
"""
|
||||
Make sure an element has the context processors collection.
|
||||
@ -62,6 +113,10 @@ def contextual(cls_or_func):
|
||||
|
||||
|
||||
def resolve_processors(mixed):
|
||||
try:
|
||||
yield from mixed.__processors__
|
||||
except AttributeError:
|
||||
# old code, deprecated usage
|
||||
if isinstance(mixed, types.FunctionType):
|
||||
yield from getattr(mixed, _CONTEXT_PROCESSORS_ATTR, ())
|
||||
|
||||
|
||||
@ -2,7 +2,7 @@ import traceback
|
||||
from time import sleep
|
||||
|
||||
from bonobo.config import Container
|
||||
from bonobo.config.processors import resolve_processors
|
||||
from bonobo.config.processors import resolve_processors, ContextCurrifier
|
||||
from bonobo.util.errors import print_error
|
||||
from bonobo.util.iterators import ensure_tuple
|
||||
from bonobo.util.objects import Wrapper
|
||||
@ -36,31 +36,19 @@ class LoopingExecutionContext(Wrapper):
|
||||
else:
|
||||
self.services = None
|
||||
|
||||
self._started, self._stopped, self._context, self._stack = False, False, None, []
|
||||
self._started, self._stopped, self._stack = False, False, None
|
||||
|
||||
def start(self):
|
||||
assert self.state == (False,
|
||||
False), ('{}.start() can only be called on a new node.').format(type(self).__name__)
|
||||
assert self._context is None
|
||||
self._started = True
|
||||
self._stack = ContextCurrifier(self.wrapped, *self._get_initial_context())
|
||||
|
||||
if self.parent:
|
||||
self._context = self.parent.services.args_for(self.wrapped)
|
||||
elif self.services:
|
||||
self._context = self.services.args_for(self.wrapped)
|
||||
else:
|
||||
self._context = ()
|
||||
|
||||
for processor in resolve_processors(self.wrapped):
|
||||
try:
|
||||
_processed = processor(self.wrapped, self, *self._context)
|
||||
_append_to_context = next(_processed)
|
||||
if _append_to_context is not None:
|
||||
self._context += ensure_tuple(_append_to_context)
|
||||
self._stack.setup(self)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
self.handle_error(exc, traceback.format_exc())
|
||||
raise
|
||||
self._stack.append(_processed)
|
||||
|
||||
def loop(self):
|
||||
"""Generic loop. A bit boring. """
|
||||
@ -78,21 +66,18 @@ class LoopingExecutionContext(Wrapper):
|
||||
return
|
||||
|
||||
self._stopped = True
|
||||
if self._context is not None:
|
||||
while len(self._stack):
|
||||
processor = self._stack.pop()
|
||||
try:
|
||||
# todo yield from ? how to ?
|
||||
next(processor)
|
||||
except StopIteration as exc:
|
||||
# This is normal, and wanted.
|
||||
pass
|
||||
self._stack.teardown()
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
self.handle_error(exc, traceback.format_exc())
|
||||
raise
|
||||
else:
|
||||
# No error ? We should have had StopIteration ...
|
||||
raise RuntimeError('Context processors should not yield more than once.')
|
||||
|
||||
def handle_error(self, exc, trace):
|
||||
return print_error(exc, trace, context=self.wrapped)
|
||||
|
||||
def _get_initial_context(self):
|
||||
if self.parent:
|
||||
return self.parent.services.args_for(self.wrapped)
|
||||
if self.services:
|
||||
return self.services.args_for(self.wrapped)
|
||||
return ()
|
||||
|
||||
@ -93,7 +93,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
|
||||
input_bag = self.get()
|
||||
|
||||
# todo add timer
|
||||
self.handle_results(input_bag, input_bag.apply(self.wrapped, *self._context))
|
||||
self.handle_results(input_bag, input_bag.apply(self._stack))
|
||||
|
||||
def push(self, bag):
|
||||
# MAKE THIS PUBLIC API FOR CONTEXT PROCESSORS !!!
|
||||
|
||||
@ -13,7 +13,6 @@ def path_str(path):
|
||||
return path if path.startswith('/') else '/' + path
|
||||
|
||||
|
||||
@contextual
|
||||
class OpenDataSoftAPI(Configurable):
|
||||
dataset = Option(str, required=True)
|
||||
endpoint = Option(str, default='{scheme}://{netloc}{path}')
|
||||
|
||||
@ -27,7 +27,6 @@ class CsvHandler(FileHandler):
|
||||
headers = Option(tuple)
|
||||
|
||||
|
||||
@contextual
|
||||
class CsvReader(CsvHandler, FileReader):
|
||||
"""
|
||||
Reads a CSV and yield the values as dicts.
|
||||
@ -60,7 +59,6 @@ class CsvReader(CsvHandler, FileReader):
|
||||
yield dict(zip(headers.value, row))
|
||||
|
||||
|
||||
@contextual
|
||||
class CsvWriter(CsvHandler, FileWriter):
|
||||
@ContextProcessor
|
||||
def writer(self, context, fs, file, lineno):
|
||||
|
||||
@ -9,7 +9,6 @@ __all__ = [
|
||||
]
|
||||
|
||||
|
||||
@contextual
|
||||
class FileHandler(Configurable):
|
||||
"""Abstract component factory for file-related components.
|
||||
|
||||
@ -75,7 +74,6 @@ class FileReader(Reader):
|
||||
yield line.rstrip(self.eol)
|
||||
|
||||
|
||||
@contextual
|
||||
class FileWriter(Writer):
|
||||
"""Component factory for file or file-like writers.
|
||||
|
||||
|
||||
@ -21,7 +21,6 @@ class JsonReader(JsonHandler, FileReader):
|
||||
yield line
|
||||
|
||||
|
||||
@contextual
|
||||
class JsonWriter(JsonHandler, FileWriter):
|
||||
@ContextProcessor
|
||||
def envelope(self, context, fs, file, lineno):
|
||||
|
||||
27
tests/test_basics.py
Normal file
27
tests/test_basics.py
Normal file
@ -0,0 +1,27 @@
|
||||
import pprint
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import bonobo
|
||||
import pytest
|
||||
from bonobo.config.processors import ContextCurrifier
|
||||
|
||||
|
||||
def test_count():
|
||||
with pytest.raises(TypeError):
|
||||
bonobo.count()
|
||||
|
||||
context = MagicMock()
|
||||
|
||||
currified = ContextCurrifier(bonobo.count)
|
||||
currified.setup(context)
|
||||
|
||||
for i in range(42):
|
||||
currified()
|
||||
currified.teardown()
|
||||
|
||||
context.send.assert_called_once()
|
||||
bag = context.send.call_args[0][0]
|
||||
assert isinstance(bag, bonobo.Bag)
|
||||
assert 0 == len(bag.kwargs)
|
||||
assert 1 == len(bag.args)
|
||||
assert bag.args[0] == 42
|
||||
@ -1,24 +1,26 @@
|
||||
from operator import attrgetter
|
||||
|
||||
from bonobo.config.processors import ContextProcessor, contextual, resolve_processors
|
||||
from bonobo.config import Configurable
|
||||
from bonobo.config.processors import ContextProcessor, resolve_processors, ContextCurrifier
|
||||
|
||||
|
||||
@contextual
|
||||
class CP1:
|
||||
class CP1(Configurable):
|
||||
@ContextProcessor
|
||||
def c(self):
|
||||
pass
|
||||
yield
|
||||
|
||||
@ContextProcessor
|
||||
def a(self):
|
||||
pass
|
||||
yield 'this is A'
|
||||
|
||||
@ContextProcessor
|
||||
def b(self):
|
||||
pass
|
||||
def b(self, a):
|
||||
yield a.upper()[:-1] + 'b'
|
||||
|
||||
def __call__(self, a, b):
|
||||
return a, b
|
||||
|
||||
|
||||
@contextual
|
||||
class CP2(CP1):
|
||||
@ContextProcessor
|
||||
def f(self):
|
||||
@ -33,7 +35,6 @@ class CP2(CP1):
|
||||
pass
|
||||
|
||||
|
||||
@contextual
|
||||
class CP3(CP2):
|
||||
@ContextProcessor
|
||||
def c(self):
|
||||
@ -52,3 +53,11 @@ def test_inheritance_and_ordering():
|
||||
assert get_all_processors_names(CP1) == ['c', 'a', 'b']
|
||||
assert get_all_processors_names(CP2) == ['c', 'a', 'b', 'f', 'e', 'd']
|
||||
assert get_all_processors_names(CP3) == ['c', 'a', 'b', 'f', 'e', 'd', 'c', 'b']
|
||||
|
||||
|
||||
def test_setup_teardown():
|
||||
o = CP1()
|
||||
stack = ContextCurrifier(o)
|
||||
stack.setup()
|
||||
assert o(*stack.context) == ('this is A', 'THIS IS b')
|
||||
stack.teardown()
|
||||
@ -1,4 +1,4 @@
|
||||
from bonobo.config.processors import contextual
|
||||
from bonobo.config.processors import ContextProcessor
|
||||
from bonobo.constants import BEGIN, END
|
||||
from bonobo.execution.graph import GraphExecutionContext
|
||||
from bonobo.strategies import NaiveStrategy
|
||||
@ -13,12 +13,11 @@ def square(i: int) -> int:
|
||||
return i**2
|
||||
|
||||
|
||||
@contextual
|
||||
def push_result(results, i: int):
|
||||
results.append(i)
|
||||
|
||||
|
||||
@push_result.__processors__.append
|
||||
@ContextProcessor.decorate(push_result)
|
||||
def results(f, context):
|
||||
results = []
|
||||
yield results
|
||||
|
||||
Reference in New Issue
Block a user