[stdlib] Adds Update(...) and FixedWindow(...) the the standard nodes provided with bonobo.

This commit is contained in:
Romain Dorgueil
2017-11-12 10:06:15 +01:00
parent a97b1c5d2e
commit c2f17296f6
15 changed files with 147 additions and 38 deletions

View File

@ -14,31 +14,43 @@ k3 1.353256585993222
import json
import timeit
def j1(d):
return {'prepend': 'foo', **d, 'append': 'bar'}
def k1(**d):
return {'prepend': 'foo', **d, 'append': 'bar'}
def j2(d):
return {**d}
def k2(**d):
return {**d}
def j3(d):
return None
def k3(**d):
return None
if __name__ == '__main__':
import timeit
with open('person.json') as f:
json_data = json.load(f)
for i in 1,2,3:
print('j{}'.format(i), timeit.timeit("j{}({!r})".format(i, json_data), setup="from __main__ import j{}".format(i)))
print('k{}'.format(i), timeit.timeit("k{}(**{!r})".format(i, json_data), setup="from __main__ import k{}".format(i)))
for i in 1, 2, 3:
print(
'j{}'.format(i),
timeit.timeit("j{}({!r})".format(i, json_data), setup="from __main__ import j{}".format(i))
)
print(
'k{}'.format(i),
timeit.timeit("k{}(**{!r})".format(i, json_data), setup="from __main__ import k{}".format(i))
)

View File

@ -1,6 +1,26 @@
from bonobo.execution.strategies import create_strategy
from bonobo.nodes import CsvReader, CsvWriter, FileReader, FileWriter, Filter, JsonReader, JsonWriter, Limit, \
PickleReader, PickleWriter, PrettyPrinter, RateLimited, Tee, arg0_to_kwargs, count, identity, kwargs_to_arg0, noop
from bonobo.nodes import (
CsvReader,
CsvWriter,
FileReader,
FileWriter,
Filter,
FixedWindow,
JsonReader,
JsonWriter,
Limit,
PickleReader,
PickleWriter,
PrettyPrinter,
RateLimited,
Tee,
Update,
arg0_to_kwargs,
count,
identity,
kwargs_to_arg0,
noop,
)
from bonobo.nodes import LdjsonReader, LdjsonWriter
from bonobo.structs import Bag, ErrorBag, Graph, Token
from bonobo.util import get_name
@ -25,8 +45,10 @@ def register_graph_api(x, __all__=__all__):
required_parameters = {'plugins', 'services', 'strategy'}
assert parameters[0] == 'graph', 'First parameter of a graph api function must be "graph".'
assert required_parameters.intersection(
parameters) == required_parameters, 'Graph api functions must define the following parameters: ' + ', '.join(
sorted(required_parameters))
parameters
) == required_parameters, 'Graph api functions must define the following parameters: ' + ', '.join(
sorted(required_parameters)
)
return register_api(x, __all__=__all__)
@ -149,6 +171,7 @@ register_api_group(
FileReader,
FileWriter,
Filter,
FixedWindow,
JsonReader,
JsonWriter,
LdjsonReader,
@ -159,6 +182,7 @@ register_api_group(
PrettyPrinter,
RateLimited,
Tee,
Update,
arg0_to_kwargs,
count,
identity,

View File

@ -22,9 +22,7 @@ class ETLCommand(BaseCommand):
create_or_update = staticmethod(create_or_update)
def create_parser(self, prog_name, subcommand):
return bonobo.get_argument_parser(
super().create_parser(prog_name, subcommand)
)
return bonobo.get_argument_parser(super().create_parser(prog_name, subcommand))
def get_graph(self, *args, **options):
def not_implemented():

View File

@ -2,12 +2,7 @@ from bonobo.plugins.jupyter import JupyterOutputPlugin
def _jupyter_nbextension_paths():
return [{
'section': 'notebook',
'src': 'static',
'dest': 'bonobo-jupyter',
'require': 'bonobo-jupyter/extension'
}]
return [{'section': 'notebook', 'src': 'static', 'dest': 'bonobo-jupyter', 'require': 'bonobo-jupyter/extension'}]
__all__ = [

View File

@ -10,6 +10,7 @@ from bonobo.util import get_name
logger = logging.getLogger(__name__)
class ExecutorStrategy(Strategy):
"""
Strategy based on a concurrent.futures.Executor subclass (or similar interface).

View File

@ -4,16 +4,17 @@ import itertools
from bonobo import settings
from bonobo.config import Configurable, Option
from bonobo.config.processors import ContextProcessor
from bonobo.constants import NOT_MODIFIED
from bonobo.structs.bags import Bag
from bonobo.util.objects import ValueHolder
from bonobo.util.term import CLEAR_EOL
from bonobo.constants import NOT_MODIFIED
__all__ = [
'FixedWindow',
'Limit',
'PrettyPrinter',
'Tee',
'Update',
'arg0_to_kwargs',
'count',
'identity',
@ -128,3 +129,49 @@ def kwargs_to_arg0(**row):
:return: bonobo.Bag
"""
return Bag(row)
def Update(*consts, **kwconsts):
"""
Transformation factory to update a stream with constant values, by appending to args and updating kwargs.
:param consts: what to append to the input stream args
:param kwconsts: what to use to update input stream kwargs
:return: function
"""
def update(*args, **kwargs):
nonlocal consts, kwconsts
return (*args, *consts, {**kwargs, **kwconsts})
update.__name__ = 'Update({})'.format(Bag.format_args(*consts, **kwconsts))
return update
class FixedWindow(Configurable):
"""
Transformation factory to create fixed windows of inputs, as lists.
For example, if the input is successively 1, 2, 3, 4, etc. and you pass it through a ``FixedWindow(2)``, you'll get
lists of elements 2 by 2: [1, 2], [3, 4], ...
"""
length = Option(int, positional=True) # type: int
@ContextProcessor
def buffer(self, context):
buffer = yield ValueHolder([])
if len(buffer):
context.send(Bag(buffer.get()))
def call(self, buffer, x):
buffer.append(x)
if len(buffer) >= self.length:
yield buffer.get()
buffer.set([])

View File

@ -30,6 +30,4 @@ class JupyterOutputPlugin(Plugin):
IPython.core.display.display(self.widget)
def tick(self, event):
self.widget.value = [
event.context[i].as_dict() for i in event.context.graph.topologically_sorted_indexes
]
self.widget.value = [event.context[i].as_dict() for i in event.context.graph.topologically_sorted_indexes]

View File

@ -1,7 +1,7 @@
import itertools
from bonobo.structs.tokens import Token
from bonobo.constants import INHERIT_INPUT, LOOPBACK
from bonobo.structs.tokens import Token
__all__ = [
'Bag',
@ -36,6 +36,10 @@ class Bag:
default_flags = ()
@staticmethod
def format_args(*args, **kwargs):
return ', '.join(itertools.chain(map(repr, args), ('{}={!r}'.format(k, v) for k, v in kwargs.items())))
def __new__(cls, *args, _flags=None, _parent=None, **kwargs):
# Handle the special case where we call Bag's constructor with only one bag or token as argument.
if len(args) == 1 and len(kwargs) == 0:
@ -86,6 +90,9 @@ class Bag:
self._args = args
self._kwargs = kwargs
def __repr__(self):
return 'Bag({})'.format(Bag.format_args(*self.args, **self.kwargs))
@property
def args(self):
if self._parent is None:
@ -141,7 +148,7 @@ class Bag:
@classmethod
def inherit(cls, *args, **kwargs):
return cls(*args, _flags=(INHERIT_INPUT, ), **kwargs)
return cls(*args, _flags=(INHERIT_INPUT,), **kwargs)
def __eq__(self, other):
# XXX there are overlapping cases, but this is very handy for now. Let's think about it later.
@ -169,19 +176,9 @@ class Bag:
return len(self.args) == 1 and not self.kwargs and self.args[0] == other
def __repr__(self):
return '<{} ({})>'.format(
type(self).__name__, ', '.join(
itertools.chain(
map(repr, self.args),
('{}={}'.format(k, repr(v)) for k, v in self.kwargs.items()),
)
)
)
class LoopbackBag(Bag):
default_flags = (LOOPBACK, )
default_flags = (LOOPBACK,)
class ErrorBag(Bag):

View File

@ -142,10 +142,10 @@ class ValueHolder:
return divmod(other, self._value)
def __pow__(self, other):
return self._value ** other
return self._value**other
def __rpow__(self, other):
return other ** self._value
return other**self._value
def __ipow__(self, other):
self._value **= other

View File

@ -1,3 +1,4 @@
from operator import methodcaller
from unittest.mock import MagicMock
import pytest
@ -5,6 +6,7 @@ import pytest
import bonobo
from bonobo.config.processors import ContextCurrifier
from bonobo.constants import NOT_MODIFIED
from bonobo.util.testing import BufferingNodeExecutionContext
def test_count():
@ -72,3 +74,38 @@ def test_tee():
def test_noop():
assert bonobo.noop(1, 2, 3, 4, foo='bar') == NOT_MODIFIED
def test_update():
with BufferingNodeExecutionContext(bonobo.Update('a', k=True)) as context:
context.write_sync('a', ('a', {'b': 1}), ('b', {'k': False}))
assert context.get_buffer() == [
bonobo.Bag('a', 'a', k=True),
bonobo.Bag('a', 'a', b=1, k=True),
bonobo.Bag('b', 'a', k=True),
]
assert context.name == "Update('a', k=True)"
def test_fixedwindow():
with BufferingNodeExecutionContext(bonobo.FixedWindow(2)) as context:
context.write_sync(*range(10))
assert context.get_buffer() == [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
with BufferingNodeExecutionContext(bonobo.FixedWindow(2)) as context:
context.write_sync(*range(9))
assert context.get_buffer() == [[0, 1], [2, 3], [4, 5], [6, 7], [8]]
with BufferingNodeExecutionContext(bonobo.FixedWindow(1)) as context:
context.write_sync(*range(3))
assert context.get_buffer() == [[0], [1], [2]]
def test_methodcaller():
with BufferingNodeExecutionContext(methodcaller('swapcase')) as context:
context.write_sync('aaa', 'bBb', 'CcC')
assert context.get_buffer() == ['AAA', 'BbB', 'cCc']
with BufferingNodeExecutionContext(methodcaller('zfill', 5)) as context:
context.write_sync('a', 'bb', 'ccc')
assert context.get_buffer() == ['0000a', '000bb', '00ccc']

View File

@ -159,7 +159,7 @@ def test_eq_operator_dict():
def test_repr():
bag = Bag('a', a=1)
assert repr(bag) == "<Bag ('a', a=1)>"
assert repr(bag) == "Bag('a', a=1)"
def test_iterator():