[core] Change the token parsing part in prevision of different flags.
This commit is contained in:
@ -9,7 +9,7 @@ from bonobo.execution.base import LoopingExecutionContext
|
|||||||
from bonobo.structs.bags import Bag
|
from bonobo.structs.bags import Bag
|
||||||
from bonobo.structs.inputs import Input
|
from bonobo.structs.inputs import Input
|
||||||
from bonobo.structs.tokens import Token
|
from bonobo.structs.tokens import Token
|
||||||
from bonobo.util import get_name, iserrorbag, isloopbackbag, isbag
|
from bonobo.util import get_name, iserrorbag, isloopbackbag, isbag, istuple
|
||||||
from bonobo.util.compat import deprecated_alias
|
from bonobo.util.compat import deprecated_alias
|
||||||
from bonobo.util.statistics import WithStatistics
|
from bonobo.util.statistics import WithStatistics
|
||||||
|
|
||||||
@ -137,12 +137,47 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def _resolve(input_bag, output):
|
def isflag(param):
|
||||||
# NotModified means to send the input unmodified to output.
|
return isinstance(param, Token) and param in (NOT_MODIFIED,)
|
||||||
if output is NOT_MODIFIED:
|
|
||||||
return input_bag
|
|
||||||
|
|
||||||
|
|
||||||
|
def split_tokens(output):
|
||||||
|
"""
|
||||||
|
Split an output into token tuple, real output tuple.
|
||||||
|
|
||||||
|
:param output:
|
||||||
|
:return: tuple, tuple
|
||||||
|
"""
|
||||||
|
if isinstance(output, Token):
|
||||||
|
# just a flag
|
||||||
|
return (output,), ()
|
||||||
|
|
||||||
|
if not istuple(output):
|
||||||
|
# no flag
|
||||||
|
return (), (output,)
|
||||||
|
|
||||||
|
i = 0
|
||||||
|
while isflag(output[i]):
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
return output[:i], output[i:]
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve(input_bag, output):
|
||||||
|
"""
|
||||||
|
This function is key to how bonobo works (and internal, too). It transforms a pair of input/output into what is the
|
||||||
|
real output.
|
||||||
|
|
||||||
|
:param input_bag: Bag
|
||||||
|
:param output: mixed
|
||||||
|
:return: Bag
|
||||||
|
"""
|
||||||
if isbag(output):
|
if isbag(output):
|
||||||
return output
|
return output
|
||||||
|
|
||||||
return Bag(output)
|
tokens, output = split_tokens(output)
|
||||||
|
|
||||||
|
if len(tokens) == 1 and tokens[0] is NOT_MODIFIED:
|
||||||
|
return input_bag
|
||||||
|
|
||||||
|
return output if isbag(output) else Bag(output)
|
||||||
|
|||||||
@ -53,7 +53,6 @@ class LdjsonReader(FileReader):
|
|||||||
|
|
||||||
def read(self, fs, file):
|
def read(self, fs, file):
|
||||||
for line in file:
|
for line in file:
|
||||||
print(line)
|
|
||||||
yield self.loader(line)
|
yield self.loader(line)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
24
tests/features/test_not_modified.py
Normal file
24
tests/features/test_not_modified.py
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
from bonobo.constants import NOT_MODIFIED
|
||||||
|
from bonobo.util.testing import BufferingNodeExecutionContext
|
||||||
|
|
||||||
|
|
||||||
|
def useless(*args, **kwargs):
|
||||||
|
return NOT_MODIFIED
|
||||||
|
|
||||||
|
|
||||||
|
def test_not_modified():
|
||||||
|
input_messages = [
|
||||||
|
('foo', 'bar'),
|
||||||
|
{'foo': 'bar'},
|
||||||
|
('foo', {'bar': 'baz'}),
|
||||||
|
(),
|
||||||
|
]
|
||||||
|
|
||||||
|
with BufferingNodeExecutionContext(useless) as context:
|
||||||
|
context.write_sync(*input_messages)
|
||||||
|
|
||||||
|
assert context.get_buffer() == input_messages
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Reference in New Issue
Block a user