Check partially configured transformations that are function based (aka transformation factories) on execution context setup.
wip: make sure all api is exported, raise an error when a function based transformation factory is (incorrectly) used as a transformation.
This commit is contained in:
@ -1,25 +1,6 @@
|
||||
from bonobo.execution.strategies import create_strategy
|
||||
from bonobo.nodes import (
|
||||
CsvReader,
|
||||
CsvWriter,
|
||||
FileReader,
|
||||
FileWriter,
|
||||
Filter,
|
||||
FixedWindow,
|
||||
JsonReader,
|
||||
JsonWriter,
|
||||
Limit,
|
||||
PickleReader,
|
||||
PickleWriter,
|
||||
PrettyPrinter,
|
||||
RateLimited,
|
||||
SetFields,
|
||||
Tee,
|
||||
count,
|
||||
identity,
|
||||
noop,
|
||||
)
|
||||
from bonobo.nodes import LdjsonReader, LdjsonWriter
|
||||
from bonobo.nodes import __all__ as _all_nodes
|
||||
from bonobo.nodes import *
|
||||
from bonobo.structs import Graph
|
||||
from bonobo.util import get_name
|
||||
from bonobo.util.environ import parse_args, get_argument_parser
|
||||
@ -51,9 +32,13 @@ def register_graph_api(x, __all__=__all__):
|
||||
return register_api(x, __all__=__all__)
|
||||
|
||||
|
||||
def register_api_group(*args):
|
||||
def register_api_group(*args, check=None):
|
||||
check = set(check) if check else None
|
||||
for attr in args:
|
||||
register_api(attr)
|
||||
if check:
|
||||
check.remove(get_name(attr))
|
||||
assert not (check and len(check))
|
||||
|
||||
|
||||
@register_graph_api
|
||||
@ -170,6 +155,7 @@ register_api_group(
|
||||
FileWriter,
|
||||
Filter,
|
||||
FixedWindow,
|
||||
Format,
|
||||
JsonReader,
|
||||
JsonWriter,
|
||||
LdjsonReader,
|
||||
@ -179,11 +165,14 @@ register_api_group(
|
||||
PickleWriter,
|
||||
PrettyPrinter,
|
||||
RateLimited,
|
||||
Rename,
|
||||
SetFields,
|
||||
Tee,
|
||||
UnpackItems,
|
||||
count,
|
||||
identity,
|
||||
noop,
|
||||
check=_all_nodes,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@ -12,4 +12,6 @@ def transformation_factory(f):
|
||||
)
|
||||
return retval
|
||||
|
||||
_transformation_factory._partial = True
|
||||
|
||||
return _transformation_factory
|
||||
|
||||
@ -50,8 +50,11 @@ class ExecutorStrategy(Strategy):
|
||||
def starter(node):
|
||||
@functools.wraps(node)
|
||||
def _runner():
|
||||
with node:
|
||||
node.loop()
|
||||
try:
|
||||
with node:
|
||||
node.loop()
|
||||
except:
|
||||
logging.getLogger(__name__).critical('Critical error in threadpool node starter.', exc_info=sys.exc_info())
|
||||
|
||||
try:
|
||||
futures.append(executor.submit(_runner))
|
||||
|
||||
@ -14,10 +14,13 @@ from mondrian import term
|
||||
|
||||
__all__ = [
|
||||
'FixedWindow',
|
||||
'Format',
|
||||
'Limit',
|
||||
'PrettyPrinter',
|
||||
'Tee',
|
||||
'Rename',
|
||||
'SetFields',
|
||||
'Tee',
|
||||
'UnpackItems',
|
||||
'count',
|
||||
'identity',
|
||||
'noop',
|
||||
|
||||
@ -22,10 +22,20 @@ def isconfigurabletype(mixed, *, strict=False):
|
||||
:return: bool
|
||||
"""
|
||||
from bonobo.config.configurables import ConfigurableMeta, PartiallyConfigured
|
||||
return isinstance(mixed, (ConfigurableMeta, ) if strict else (
|
||||
ConfigurableMeta,
|
||||
PartiallyConfigured,
|
||||
))
|
||||
|
||||
if isinstance(mixed, ConfigurableMeta):
|
||||
return True
|
||||
|
||||
if strict:
|
||||
return False
|
||||
|
||||
if isinstance(mixed, PartiallyConfigured):
|
||||
return True
|
||||
|
||||
if hasattr(mixed, '_partial') and mixed._partial:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def isoption(mixed):
|
||||
|
||||
Reference in New Issue
Block a user