diff --git a/benchmarks/parameters.py b/benchmarks/parameters.py index 9098fde..f51e389 100644 --- a/benchmarks/parameters.py +++ b/benchmarks/parameters.py @@ -14,31 +14,43 @@ k3 1.353256585993222 import json import timeit + def j1(d): return {'prepend': 'foo', **d, 'append': 'bar'} + def k1(**d): return {'prepend': 'foo', **d, 'append': 'bar'} + def j2(d): return {**d} + def k2(**d): return {**d} + def j3(d): return None + def k3(**d): return None + if __name__ == '__main__': import timeit with open('person.json') as f: json_data = json.load(f) - - for i in 1,2,3: - print('j{}'.format(i), timeit.timeit("j{}({!r})".format(i, json_data), setup="from __main__ import j{}".format(i))) - print('k{}'.format(i), timeit.timeit("k{}(**{!r})".format(i, json_data), setup="from __main__ import k{}".format(i))) + for i in 1, 2, 3: + print( + 'j{}'.format(i), + timeit.timeit("j{}({!r})".format(i, json_data), setup="from __main__ import j{}".format(i)) + ) + print( + 'k{}'.format(i), + timeit.timeit("k{}(**{!r})".format(i, json_data), setup="from __main__ import k{}".format(i)) + ) diff --git a/bonobo/_api.py b/bonobo/_api.py index 925e6ce..e7362dd 100644 --- a/bonobo/_api.py +++ b/bonobo/_api.py @@ -1,6 +1,26 @@ from bonobo.execution.strategies import create_strategy -from bonobo.nodes import CsvReader, CsvWriter, FileReader, FileWriter, Filter, JsonReader, JsonWriter, Limit, \ - PickleReader, PickleWriter, PrettyPrinter, RateLimited, Tee, arg0_to_kwargs, count, identity, kwargs_to_arg0, noop +from bonobo.nodes import ( + CsvReader, + CsvWriter, + FileReader, + FileWriter, + Filter, + FixedWindow, + JsonReader, + JsonWriter, + Limit, + PickleReader, + PickleWriter, + PrettyPrinter, + RateLimited, + Tee, + Update, + arg0_to_kwargs, + count, + identity, + kwargs_to_arg0, + noop, +) from bonobo.nodes import LdjsonReader, LdjsonWriter from bonobo.structs import Bag, ErrorBag, Graph, Token from bonobo.util import get_name @@ -25,8 +45,10 @@ def register_graph_api(x, __all__=__all__): required_parameters = {'plugins', 'services', 'strategy'} assert parameters[0] == 'graph', 'First parameter of a graph api function must be "graph".' assert required_parameters.intersection( - parameters) == required_parameters, 'Graph api functions must define the following parameters: ' + ', '.join( - sorted(required_parameters)) + parameters + ) == required_parameters, 'Graph api functions must define the following parameters: ' + ', '.join( + sorted(required_parameters) + ) return register_api(x, __all__=__all__) @@ -149,6 +171,7 @@ register_api_group( FileReader, FileWriter, Filter, + FixedWindow, JsonReader, JsonWriter, LdjsonReader, @@ -159,6 +182,7 @@ register_api_group( PrettyPrinter, RateLimited, Tee, + Update, arg0_to_kwargs, count, identity, diff --git a/bonobo/contrib/django/commands.py b/bonobo/contrib/django/commands.py index 11ec680..457273d 100644 --- a/bonobo/contrib/django/commands.py +++ b/bonobo/contrib/django/commands.py @@ -22,9 +22,7 @@ class ETLCommand(BaseCommand): create_or_update = staticmethod(create_or_update) def create_parser(self, prog_name, subcommand): - return bonobo.get_argument_parser( - super().create_parser(prog_name, subcommand) - ) + return bonobo.get_argument_parser(super().create_parser(prog_name, subcommand)) def get_graph(self, *args, **options): def not_implemented(): diff --git a/bonobo/contrib/jupyter/__init__.py b/bonobo/contrib/jupyter/__init__.py index 7e5f892..49242be 100644 --- a/bonobo/contrib/jupyter/__init__.py +++ b/bonobo/contrib/jupyter/__init__.py @@ -2,12 +2,7 @@ from bonobo.plugins.jupyter import JupyterOutputPlugin def _jupyter_nbextension_paths(): - return [{ - 'section': 'notebook', - 'src': 'static', - 'dest': 'bonobo-jupyter', - 'require': 'bonobo-jupyter/extension' - }] + return [{'section': 'notebook', 'src': 'static', 'dest': 'bonobo-jupyter', 'require': 'bonobo-jupyter/extension'}] __all__ = [ diff --git a/bonobo/execution/strategies/executor.py b/bonobo/execution/strategies/executor.py index ea6c08e..f99c4cc 100644 --- a/bonobo/execution/strategies/executor.py +++ b/bonobo/execution/strategies/executor.py @@ -10,6 +10,7 @@ from bonobo.util import get_name logger = logging.getLogger(__name__) + class ExecutorStrategy(Strategy): """ Strategy based on a concurrent.futures.Executor subclass (or similar interface). diff --git a/bonobo/nodes/basics.py b/bonobo/nodes/basics.py index fa74e40..4054d3d 100644 --- a/bonobo/nodes/basics.py +++ b/bonobo/nodes/basics.py @@ -4,16 +4,17 @@ import itertools from bonobo import settings from bonobo.config import Configurable, Option from bonobo.config.processors import ContextProcessor +from bonobo.constants import NOT_MODIFIED from bonobo.structs.bags import Bag from bonobo.util.objects import ValueHolder from bonobo.util.term import CLEAR_EOL -from bonobo.constants import NOT_MODIFIED - __all__ = [ + 'FixedWindow', 'Limit', 'PrettyPrinter', 'Tee', + 'Update', 'arg0_to_kwargs', 'count', 'identity', @@ -128,3 +129,49 @@ def kwargs_to_arg0(**row): :return: bonobo.Bag """ return Bag(row) + + +def Update(*consts, **kwconsts): + """ + Transformation factory to update a stream with constant values, by appending to args and updating kwargs. + + :param consts: what to append to the input stream args + :param kwconsts: what to use to update input stream kwargs + :return: function + + """ + + def update(*args, **kwargs): + nonlocal consts, kwconsts + return (*args, *consts, {**kwargs, **kwconsts}) + + update.__name__ = 'Update({})'.format(Bag.format_args(*consts, **kwconsts)) + + return update + + +class FixedWindow(Configurable): + """ + Transformation factory to create fixed windows of inputs, as lists. + + For example, if the input is successively 1, 2, 3, 4, etc. and you pass it through a ``FixedWindow(2)``, you'll get + lists of elements 2 by 2: [1, 2], [3, 4], ... + + """ + + length = Option(int, positional=True) # type: int + + @ContextProcessor + def buffer(self, context): + buffer = yield ValueHolder([]) + if len(buffer): + context.send(Bag(buffer.get())) + + def call(self, buffer, x): + buffer.append(x) + if len(buffer) >= self.length: + yield buffer.get() + buffer.set([]) + + + diff --git a/bonobo/plugins/jupyter.py b/bonobo/plugins/jupyter.py index 9049964..245ac95 100644 --- a/bonobo/plugins/jupyter.py +++ b/bonobo/plugins/jupyter.py @@ -30,6 +30,4 @@ class JupyterOutputPlugin(Plugin): IPython.core.display.display(self.widget) def tick(self, event): - self.widget.value = [ - event.context[i].as_dict() for i in event.context.graph.topologically_sorted_indexes - ] + self.widget.value = [event.context[i].as_dict() for i in event.context.graph.topologically_sorted_indexes] diff --git a/bonobo/structs/bags.py b/bonobo/structs/bags.py index 8683175..f303f92 100644 --- a/bonobo/structs/bags.py +++ b/bonobo/structs/bags.py @@ -1,7 +1,7 @@ import itertools -from bonobo.structs.tokens import Token from bonobo.constants import INHERIT_INPUT, LOOPBACK +from bonobo.structs.tokens import Token __all__ = [ 'Bag', @@ -36,6 +36,10 @@ class Bag: default_flags = () + @staticmethod + def format_args(*args, **kwargs): + return ', '.join(itertools.chain(map(repr, args), ('{}={!r}'.format(k, v) for k, v in kwargs.items()))) + def __new__(cls, *args, _flags=None, _parent=None, **kwargs): # Handle the special case where we call Bag's constructor with only one bag or token as argument. if len(args) == 1 and len(kwargs) == 0: @@ -86,6 +90,9 @@ class Bag: self._args = args self._kwargs = kwargs + def __repr__(self): + return 'Bag({})'.format(Bag.format_args(*self.args, **self.kwargs)) + @property def args(self): if self._parent is None: @@ -141,7 +148,7 @@ class Bag: @classmethod def inherit(cls, *args, **kwargs): - return cls(*args, _flags=(INHERIT_INPUT, ), **kwargs) + return cls(*args, _flags=(INHERIT_INPUT,), **kwargs) def __eq__(self, other): # XXX there are overlapping cases, but this is very handy for now. Let's think about it later. @@ -169,19 +176,9 @@ class Bag: return len(self.args) == 1 and not self.kwargs and self.args[0] == other - def __repr__(self): - return '<{} ({})>'.format( - type(self).__name__, ', '.join( - itertools.chain( - map(repr, self.args), - ('{}={}'.format(k, repr(v)) for k, v in self.kwargs.items()), - ) - ) - ) - class LoopbackBag(Bag): - default_flags = (LOOPBACK, ) + default_flags = (LOOPBACK,) class ErrorBag(Bag): diff --git a/bonobo/util/objects.py b/bonobo/util/objects.py index 209f4db..f3ffa5e 100644 --- a/bonobo/util/objects.py +++ b/bonobo/util/objects.py @@ -142,10 +142,10 @@ class ValueHolder: return divmod(other, self._value) def __pow__(self, other): - return self._value ** other + return self._value**other def __rpow__(self, other): - return other ** self._value + return other**self._value def __ipow__(self, other): self._value **= other diff --git a/tests/io/test_csv.py b/tests/nodes/io/test_csv.py similarity index 100% rename from tests/io/test_csv.py rename to tests/nodes/io/test_csv.py diff --git a/tests/io/test_file.py b/tests/nodes/io/test_file.py similarity index 100% rename from tests/io/test_file.py rename to tests/nodes/io/test_file.py diff --git a/tests/io/test_json.py b/tests/nodes/io/test_json.py similarity index 100% rename from tests/io/test_json.py rename to tests/nodes/io/test_json.py diff --git a/tests/io/test_pickle.py b/tests/nodes/io/test_pickle.py similarity index 100% rename from tests/io/test_pickle.py rename to tests/nodes/io/test_pickle.py diff --git a/tests/test_basics.py b/tests/nodes/test_basics.py similarity index 55% rename from tests/test_basics.py rename to tests/nodes/test_basics.py index 283e3d7..de72069 100644 --- a/tests/test_basics.py +++ b/tests/nodes/test_basics.py @@ -1,3 +1,4 @@ +from operator import methodcaller from unittest.mock import MagicMock import pytest @@ -5,6 +6,7 @@ import pytest import bonobo from bonobo.config.processors import ContextCurrifier from bonobo.constants import NOT_MODIFIED +from bonobo.util.testing import BufferingNodeExecutionContext def test_count(): @@ -72,3 +74,38 @@ def test_tee(): def test_noop(): assert bonobo.noop(1, 2, 3, 4, foo='bar') == NOT_MODIFIED + + +def test_update(): + with BufferingNodeExecutionContext(bonobo.Update('a', k=True)) as context: + context.write_sync('a', ('a', {'b': 1}), ('b', {'k': False})) + assert context.get_buffer() == [ + bonobo.Bag('a', 'a', k=True), + bonobo.Bag('a', 'a', b=1, k=True), + bonobo.Bag('b', 'a', k=True), + ] + assert context.name == "Update('a', k=True)" + + +def test_fixedwindow(): + with BufferingNodeExecutionContext(bonobo.FixedWindow(2)) as context: + context.write_sync(*range(10)) + assert context.get_buffer() == [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]] + + with BufferingNodeExecutionContext(bonobo.FixedWindow(2)) as context: + context.write_sync(*range(9)) + assert context.get_buffer() == [[0, 1], [2, 3], [4, 5], [6, 7], [8]] + + with BufferingNodeExecutionContext(bonobo.FixedWindow(1)) as context: + context.write_sync(*range(3)) + assert context.get_buffer() == [[0], [1], [2]] + + +def test_methodcaller(): + with BufferingNodeExecutionContext(methodcaller('swapcase')) as context: + context.write_sync('aaa', 'bBb', 'CcC') + assert context.get_buffer() == ['AAA', 'BbB', 'cCc'] + + with BufferingNodeExecutionContext(methodcaller('zfill', 5)) as context: + context.write_sync('a', 'bb', 'ccc') + assert context.get_buffer() == ['0000a', '000bb', '00ccc'] diff --git a/tests/structs/test_bags.py b/tests/structs/test_bags.py index b5517e3..6a13bbf 100644 --- a/tests/structs/test_bags.py +++ b/tests/structs/test_bags.py @@ -159,7 +159,7 @@ def test_eq_operator_dict(): def test_repr(): bag = Bag('a', a=1) - assert repr(bag) == "" + assert repr(bag) == "Bag('a', a=1)" def test_iterator():