diff --git a/bonobo/_api.py b/bonobo/_api.py index 5f08ae3..0df8594 100644 --- a/bonobo/_api.py +++ b/bonobo/_api.py @@ -10,7 +10,7 @@ to another is maximal. from bonobo.execution.strategies import create_strategy from bonobo.nodes import * from bonobo.nodes import __all__ as _all_nodes -from bonobo.structs import Graph +from bonobo.structs.graphs import Graph from bonobo.util.api import ApiHelper from bonobo.util.environ import parse_args, get_argument_parser from bonobo.registry import create_reader, create_writer diff --git a/bonobo/constants.py b/bonobo/constants.py index fde06de..056de9f 100644 --- a/bonobo/constants.py +++ b/bonobo/constants.py @@ -20,30 +20,13 @@ """ - - -class Token: - def __init__(self, name): - self.__name__ = name - - def __repr__(self): - return '<{}>'.format(self.__name__) - - -class Flag(Token): - must_be_first = False - must_be_last = False - allows_data = True - +from bonobo.structs.tokens import Token +from bonobo.util.envelopes import UnchangedEnvelope BEGIN = Token('Begin') END = Token('End') -INHERIT = Flag('Inherit') -NOT_MODIFIED = Flag('NotModified') -NOT_MODIFIED.must_be_first = True -NOT_MODIFIED.must_be_last = True -NOT_MODIFIED.allows_data = False +NOT_MODIFIED = UnchangedEnvelope() EMPTY = tuple() diff --git a/bonobo/execution/contexts/graph.py b/bonobo/execution/contexts/graph.py index ffc9dcc..59c9063 100644 --- a/bonobo/execution/contexts/graph.py +++ b/bonobo/execution/contexts/graph.py @@ -4,7 +4,7 @@ from queue import Empty from time import sleep from bonobo.config import create_container -from bonobo.constants import BEGIN, END +from bonobo.constants import BEGIN, END, EMPTY from bonobo.errors import InactiveReadableError from bonobo.execution import events from bonobo.execution.contexts.base import BaseContext @@ -120,6 +120,10 @@ class GraphExecutionContext(BaseContext): except InactiveReadableError: nodes.discard(node) + def run_until_complete(self): + self.write(BEGIN, EMPTY, END) + self.loop() + def stop(self, stopper=None): super(GraphExecutionContext, self).stop() diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index 44476f0..dd4d138 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -7,12 +7,14 @@ from types import GeneratorType from bonobo.config import create_container from bonobo.config.processors import ContextCurrifier -from bonobo.constants import NOT_MODIFIED, BEGIN, END, TICK_PERIOD, Token, Flag, INHERIT +from bonobo.constants import BEGIN, END, TICK_PERIOD from bonobo.errors import InactiveReadableError, UnrecoverableError, UnrecoverableTypeError from bonobo.execution.contexts.base import BaseContext from bonobo.structs.inputs import Input -from bonobo.util import get_name, isconfigurabletype, ensure_tuple +from bonobo.structs.tokens import Token, Flag +from bonobo.util import get_name, isconfigurabletype, ensure_tuple, deprecated from bonobo.util.bags import BagType +from bonobo.util.envelopes import isenvelope, F_NOT_MODIFIED, F_INHERIT from bonobo.util.statistics import WithStatistics logger = logging.getLogger(__name__) @@ -329,17 +331,29 @@ class NodeExecutionContext(BaseContext, WithStatistics): :return: Bag """ - tokens, _output = split_token(_output) + if isenvelope(_output): + _output, _flags, _options = _output.unfold() + else: + _flags, _options = [], {} - if NOT_MODIFIED in tokens: - return ensure_tuple(_input, cls=(self.output_type or tuple)) + if len(_flags): + # TODO: parse flags to check constraints are respected (like not modified alone, etc.) - if INHERIT in tokens: - if self._output_type is None: - self._output_type = concat_types(self._input_type, self._input_length, self._output_type, len(_output)) - _output = _input + ensure_tuple(_output) + if F_NOT_MODIFIED in _flags: + return _input - return ensure_tuple(_output, cls=(self._output_type or tuple)) + if F_INHERIT in _flags: + if self._output_type is None: + self._output_type = concat_types( + self._input_type, self._input_length, self._output_type, len(_output) + ) + _output = _input + ensure_tuple(_output) + + if not self._output_type: + if issubclass(type(_output), tuple): + self._output_type = type(_output) + + return ensure_tuple(_output, cls=self._output_type) def _send(self, value, _control=False): """ @@ -367,6 +381,7 @@ def isflag(param): return isinstance(param, Flag) +@deprecated def split_token(output): """ Split an output into token tuple, real output tuple. @@ -392,6 +407,7 @@ def split_token(output): output = output[i:] if not data_allowed and len(output): raise ValueError('Output data provided after a flag that does not allow data.') + return flags, output diff --git a/bonobo/structs/__init__.py b/bonobo/structs/__init__.py index ba640c9..e69de29 100644 --- a/bonobo/structs/__init__.py +++ b/bonobo/structs/__init__.py @@ -1,5 +0,0 @@ -from bonobo.structs.graphs import Graph - -__all__ = [ - 'Graph', -] diff --git a/bonobo/structs/tokens.py b/bonobo/structs/tokens.py new file mode 100644 index 0000000..9b8f151 --- /dev/null +++ b/bonobo/structs/tokens.py @@ -0,0 +1,12 @@ +class Token: + def __init__(self, name): + self.__name__ = name + + def __repr__(self): + return '<{}>'.format(self.__name__) + + +class Flag(Token): + must_be_first = False + must_be_last = False + allows_data = True diff --git a/bonobo/util/collections.py b/bonobo/util/collections.py index 1f72e8c..1234142 100644 --- a/bonobo/util/collections.py +++ b/bonobo/util/collections.py @@ -3,7 +3,17 @@ import functools class sortedlist(list): + """ + A list with an insort() method that wan be used to maintain sorted lists. The list by itself is not sorted, it's + up to the user to not insert unsorted elements. + """ + def insort(self, x): + """ + If the list is sorted, insert the element in the right place. Otherwise, unpredictable behaviour. + + :param x: + """ bisect.insort(self, x) diff --git a/bonobo/util/envelopes.py b/bonobo/util/envelopes.py new file mode 100644 index 0000000..04f8080 --- /dev/null +++ b/bonobo/util/envelopes.py @@ -0,0 +1,32 @@ +from bonobo.structs.tokens import Flag + +F_INHERIT = Flag('Inherit') + +F_NOT_MODIFIED = Flag('NotModified') +F_NOT_MODIFIED.must_be_first = True +F_NOT_MODIFIED.must_be_last = True +F_NOT_MODIFIED.allows_data = False + + +class Envelope: + def __init__(self, content, *, flags=None, **options): + self._content = content + self._flags = set(flags or ()) + self._options = options + + def unfold(self): + return self._content, self._flags, self._options + + +class AppendingEnvelope(Envelope): + def __init__(self, content, **options): + super().__init__(content, flags={F_INHERIT}, **options) + + +class UnchangedEnvelope(Envelope): + def __init__(self, **options): + super().__init__(None, flags={F_NOT_MODIFIED}, **options) + + +def isenvelope(mixed): + return isinstance(mixed, Envelope) diff --git a/bonobo/util/testing.py b/bonobo/util/testing.py index 8000e89..19dd472 100644 --- a/bonobo/util/testing.py +++ b/bonobo/util/testing.py @@ -11,7 +11,7 @@ import pytest from bonobo import open_fs, __main__, get_examples_path from bonobo.commands import entrypoint -from bonobo.constants import Token +from bonobo.structs.tokens import Token from bonobo.execution.contexts.graph import GraphExecutionContext from bonobo.execution.contexts.node import NodeExecutionContext diff --git a/tests/examples/test_example_change_some_fields.py b/tests/examples/test_example_change_some_fields.py new file mode 100644 index 0000000..8fd5ff0 --- /dev/null +++ b/tests/examples/test_example_change_some_fields.py @@ -0,0 +1,62 @@ +from collections import namedtuple + +import bonobo +from bonobo.config import use_raw_input +from bonobo.execution.contexts import GraphExecutionContext +from bonobo.util.bags import BagType + +Extracted = namedtuple('Extracted', ['id', 'name', 'value']) +ExtractedBT = BagType('ExtractedBT', ['id', 'name', 'value']) + + +def extract_nt(): + yield Extracted(id=1, name='Guido', value='.py') + yield Extracted(id=2, name='Larry', value='.pl') + yield Extracted(id=3, name='Dennis', value='.c') + yield Extracted(id=4, name='Yukihiro', value='.rb') + + +def extract_bt(): + yield ExtractedBT(id=1, name='Guido', value='.py') + yield ExtractedBT(id=2, name='Larry', value='.pl') + yield ExtractedBT(id=3, name='Dennis', value='.c') + yield ExtractedBT(id=4, name='Yukihiro', value='.rb') + + +def transform_using_args(id, name, value): + yield Extracted(id=id * 2, name=name, value=name.lower() + value) + + +@use_raw_input +def transform_nt(row): + yield row._replace(name=row.name.upper()) + + +def StoreInList(buffer: list): + def store_in_list(*args, buffer=buffer): + buffer.append(args) + + return store_in_list + + +def test_execution(): + graph = bonobo.Graph() + + result_args = [] + result_nt = [] + result_bt = [] + + graph.add_chain(extract_nt, transform_using_args, StoreInList(result_args)) + graph.add_chain(transform_nt, StoreInList(result_nt), _input=extract_nt) + graph.add_chain(extract_bt, transform_using_args, StoreInList(result_bt)) + + with GraphExecutionContext(graph) as context: + context.run_until_complete() + + assert result_args == [(2, 'Guido', 'guido.py'), (4, 'Larry', 'larry.pl'), (6, 'Dennis', 'dennis.c'), + (8, 'Yukihiro', 'yukihiro.rb')] + + assert result_nt == [(1, 'GUIDO', '.py'), (2, 'LARRY', '.pl'), (3, 'DENNIS', '.c'), (4, 'YUKIHIRO', '.rb')] + + assert result_bt == [(2, 'Guido', 'guido.py'), (4, 'Larry', 'larry.pl'), (6, 'Dennis', 'dennis.c'), + (8, 'Yukihiro', 'yukihiro.rb')] diff --git a/tests/execution/contexts/test_execution_contexts_node.py b/tests/execution/contexts/test_execution_contexts_node.py index 5af665d..ca9de41 100644 --- a/tests/execution/contexts/test_execution_contexts_node.py +++ b/tests/execution/contexts/test_execution_contexts_node.py @@ -3,9 +3,10 @@ from unittest.mock import MagicMock import pytest from bonobo import Graph -from bonobo.constants import EMPTY, NOT_MODIFIED, INHERIT +from bonobo.constants import EMPTY from bonobo.execution.contexts.node import NodeExecutionContext, split_token from bonobo.execution.strategies import NaiveStrategy +from bonobo.util.envelopes import F_NOT_MODIFIED, F_INHERIT from bonobo.util.testing import BufferingNodeExecutionContext, BufferingGraphExecutionContext @@ -227,32 +228,36 @@ def test_node_lifecycle_with_kill(): def test_split_token(): - assert split_token(('foo', 'bar')) == (set(), ('foo', 'bar')) - assert split_token(()) == (set(), ()) - assert split_token('') == (set(), ('', )) + with pytest.deprecated_call(): + assert split_token(('foo', 'bar')) == (set(), ('foo', 'bar')) + assert split_token(()) == (set(), ()) + assert split_token('') == (set(), ('', )) def test_split_token_duplicate(): - with pytest.raises(ValueError): - split_token((NOT_MODIFIED, NOT_MODIFIED)) - with pytest.raises(ValueError): - split_token((INHERIT, INHERIT)) - with pytest.raises(ValueError): - split_token((INHERIT, NOT_MODIFIED, INHERIT)) + with pytest.deprecated_call(): + with pytest.raises(ValueError): + split_token((F_NOT_MODIFIED, F_NOT_MODIFIED)) + with pytest.raises(ValueError): + split_token((F_INHERIT, F_INHERIT)) + with pytest.raises(ValueError): + split_token((F_INHERIT, F_NOT_MODIFIED, F_INHERIT)) def test_split_token_not_modified(): - with pytest.raises(ValueError): - split_token((NOT_MODIFIED, 'foo', 'bar')) - with pytest.raises(ValueError): - split_token((NOT_MODIFIED, INHERIT)) - with pytest.raises(ValueError): - split_token((INHERIT, NOT_MODIFIED)) - assert split_token(NOT_MODIFIED) == ({NOT_MODIFIED}, ()) - assert split_token((NOT_MODIFIED, )) == ({NOT_MODIFIED}, ()) + with pytest.deprecated_call(): + with pytest.raises(ValueError): + split_token((F_NOT_MODIFIED, 'foo', 'bar')) + with pytest.raises(ValueError): + split_token((F_NOT_MODIFIED, F_INHERIT)) + with pytest.raises(ValueError): + split_token((F_INHERIT, F_NOT_MODIFIED)) + assert split_token(F_NOT_MODIFIED) == ({F_NOT_MODIFIED}, ()) + assert split_token((F_NOT_MODIFIED, )) == ({F_NOT_MODIFIED}, ()) def test_split_token_inherit(): - assert split_token(INHERIT) == ({INHERIT}, ()) - assert split_token((INHERIT, )) == ({INHERIT}, ()) - assert split_token((INHERIT, 'foo', 'bar')) == ({INHERIT}, ('foo', 'bar')) + with pytest.deprecated_call(): + assert split_token(F_INHERIT) == ({F_INHERIT}, ()) + assert split_token((F_INHERIT, )) == ({F_INHERIT}, ()) + assert split_token((F_INHERIT, 'foo', 'bar')) == ({F_INHERIT}, ('foo', 'bar')) diff --git a/tests/features/test_inherit.py b/tests/features/test_inherit.py index 92b943b..63c6a5d 100644 --- a/tests/features/test_inherit.py +++ b/tests/features/test_inherit.py @@ -1,4 +1,4 @@ -from bonobo.constants import INHERIT +from bonobo.util.envelopes import AppendingEnvelope from bonobo.util.testing import BufferingNodeExecutionContext messages = [ @@ -8,7 +8,7 @@ messages = [ def append(*args): - return INHERIT, '!' + return AppendingEnvelope('!') def test_inherit(): diff --git a/tests/features/test_not_modified.py b/tests/features/test_not_modified.py index 20f6f96..63b27d8 100644 --- a/tests/features/test_not_modified.py +++ b/tests/features/test_not_modified.py @@ -15,4 +15,6 @@ def test_not_modified(): with BufferingNodeExecutionContext(useless) as context: context.write_sync(*input_messages) - assert context.get_buffer() == input_messages + result = context.get_buffer() + print(result) + assert result == input_messages diff --git a/tests/nodes/test_casts.py b/tests/nodes/test_casts.py new file mode 100644 index 0000000..3eb3621 --- /dev/null +++ b/tests/nodes/test_casts.py @@ -0,0 +1,65 @@ +from collections import namedtuple +from typing import Callable + +import pytest + +from bonobo.constants import EMPTY +from bonobo.util.bags import BagType +from bonobo.util.envelopes import Envelope +from bonobo.util.testing import BufferingNodeExecutionContext + +MyTuple = namedtuple('MyTuple', ['a', 'b', 'c']) +MyBag = BagType('MyBag', ['a', 'b', 'c']) + + +class MyCustomType(): + def __init__(self, *args): + self.args = args + + def as_tuple(self): + return MyBag(*self.args) + + +@pytest.mark.parametrize(['factory', 'expected', 'expected_item0'], [ + [lambda: (1, 2, 3), tuple, int], + [lambda: Envelope((1, 2, 3)), tuple, int], + [lambda: MyTuple(1, 2, 3), MyTuple, int], + [lambda: Envelope(MyTuple(1, 2, 3)), MyTuple, int], + [lambda: MyBag(1, 2, 3), MyBag, int], + [lambda: Envelope(MyBag(1, 2, 3)), MyBag, int], + [lambda: MyCustomType(1, 2, 3), tuple, MyCustomType], + [lambda: Envelope(MyCustomType(1, 2, 3)), tuple, MyCustomType], +]) +def test_casts_after_output(factory: Callable, expected, expected_item0): + def transform(): + yield factory() + yield factory() + + with BufferingNodeExecutionContext(transform) as context: + context.write_sync(EMPTY) + + result = context.get_buffer() + assert expected == type(result[0]) + assert expected_item0 == type(result[0][0]) + assert expected == type(result[1]) + assert expected_item0 == type(result[1][0]) + + +def test_cast_after_returning_custom_type(): + def transform(): + yield MyCustomType(1, 2, 3) + yield MyCustomType(4, 5, 6) + + with BufferingNodeExecutionContext(transform) as context: + context.write_sync(EMPTY) + result = context.get_buffer() + assert tuple == type(result[0]) + assert tuple == type(result[1]) + assert MyCustomType == type(result[0][0]) + assert MyCustomType == type(result[1][0]) + + with BufferingNodeExecutionContext(MyCustomType.as_tuple) as context: + context.write_sync(*result) + result = context.get_buffer() + assert MyBag == type(result[0]) + assert MyBag == type(result[1]) diff --git a/tests/structs/test_graphs.py b/tests/structs/test_graphs.py index 7f3a58d..51321ae 100644 --- a/tests/structs/test_graphs.py +++ b/tests/structs/test_graphs.py @@ -3,7 +3,7 @@ import pytest from unittest.mock import sentinel from bonobo.constants import BEGIN -from bonobo.structs import Graph +from bonobo.structs.graphs import Graph identity = lambda x: x diff --git a/tests/structs/test_tokens.py b/tests/structs/test_tokens.py index e66b796..6bd9bee 100644 --- a/tests/structs/test_tokens.py +++ b/tests/structs/test_tokens.py @@ -1,4 +1,4 @@ -from bonobo.constants import Token +from bonobo.structs.tokens import Token def test_token_repr(): diff --git a/tests/test_execution.py b/tests/test_execution.py index 92cd30c..3f51c45 100644 --- a/tests/test_execution.py +++ b/tests/test_execution.py @@ -2,7 +2,7 @@ from bonobo.config.processors import use_context_processor from bonobo.constants import BEGIN, END from bonobo.execution.contexts.graph import GraphExecutionContext from bonobo.execution.strategies import NaiveStrategy -from bonobo.structs import Graph +from bonobo.structs.graphs import Graph def generate_integers():