diff --git a/.style.yapf b/.style.yapf index c9a88d5..eaa5a7a 100644 --- a/.style.yapf +++ b/.style.yapf @@ -1,3 +1,4 @@ [style] based_on_style = pep8 column_limit = 120 +dedent_closing_brackets = true diff --git a/Makefile b/Makefile index a26569b..1db98f5 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ # This file has been auto-generated. # All changes will be lost, see Projectfile. # -# Updated at 2016-12-29 17:33:45.585172 +# Updated at 2017-01-03 12:10:41.605435 PYTHON ?= $(shell which python) PYTHON_BASENAME ?= $(shell basename $(PYTHON)) @@ -24,13 +24,13 @@ YAPF_OPTIONS ?= -rip # Installs the local project dependencies. install: $(VIRTUAL_ENV) if [ -z "$(QUICK)" ]; then \ - $(PIP) install -Ur $(PYTHON_REQUIREMENTS_FILE) ; \ + $(PIP) install -U pip wheel -r $(PYTHON_REQUIREMENTS_FILE) ; \ fi # Installs the local project dependencies, including development-only libraries. install-dev: $(VIRTUAL_ENV) if [ -z "$(QUICK)" ]; then \ - $(PIP) install -Ur $(PYTHON_REQUIREMENTS_DEV_FILE) ; \ + $(PIP) install -U pip wheel -r $(PYTHON_REQUIREMENTS_DEV_FILE) ; \ fi # Cleans up the local mess. @@ -41,7 +41,7 @@ clean: # Setup the local virtualenv, or use the one provided by the current environment. $(VIRTUAL_ENV): virtualenv -p $(PYTHON) $(VIRTUAL_ENV) - $(PIP) install -U pip\>=9,\<10 wheel\>=0.29,\<1.0 + $(PIP) install -U pip wheel ln -fs $(VIRTUAL_ENV)/bin/activate activate-$(PYTHON_BASENAME) lint: install-dev diff --git a/Projectfile b/Projectfile index b4ea177..7b3ad10 100644 --- a/Projectfile +++ b/Projectfile @@ -23,6 +23,7 @@ enable_features = { install_requires = [ 'blessings >=1.6,<1.7', 'psutil >=5.0,<5.1', + 'toolz >=0.8,<0.9', ] extras_require = { diff --git a/bonobo/__init__.py b/bonobo/__init__.py index 685e08d..e82abba 100644 --- a/bonobo/__init__.py +++ b/bonobo/__init__.py @@ -43,11 +43,12 @@ __all__ = [ 'ThreadPoolExecutorStrategy', '__version__', 'console_run', - 'head', 'inject', 'jupyter_run', + 'limit', 'log', 'noop', + 'pprint', 'run', 'service', 'tee', diff --git a/bonobo/core/__init__.py b/bonobo/core/__init__.py index cca4c91..cc66195 100644 --- a/bonobo/core/__init__.py +++ b/bonobo/core/__init__.py @@ -1,6 +1,6 @@ """ Core required libraries. """ -from .bags import Bag +from .bags import Bag, ErrorBag from .graphs import Graph from .services import inject, service from .strategies.executor import ThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy @@ -8,6 +8,7 @@ from .strategies.naive import NaiveStrategy __all__ = [ 'Bag', + 'ErrorBag', 'Graph', 'NaiveStrategy', 'ProcessPoolExecutorStrategy', diff --git a/bonobo/core/bags.py b/bonobo/core/bags.py index c4f7c7b..7f695ee 100644 --- a/bonobo/core/bags.py +++ b/bonobo/core/bags.py @@ -2,6 +2,11 @@ import itertools from bonobo.util.tokens import Token +__all__ = [ + 'Bag', + 'ErrorBag', +] + INHERIT_INPUT = Token('InheritInput') @@ -18,7 +23,8 @@ class Bag: return self._args return ( *self._parent.args, - *self._args, ) + *self._args, + ) @property def kwargs(self): @@ -34,7 +40,24 @@ class Bag: return self._flags def apply(self, func_or_iter, *args, **kwargs): - return func_or_iter(*args, *self.args, **kwargs, **self.kwargs) + if callable(func_or_iter): + return func_or_iter(*args, *self.args, **kwargs, **self.kwargs) + + if len(args) == 0 and len(kwargs) == 0: + try: + iter(func_or_iter) + + def generator(): + nonlocal func_or_iter + for x in func_or_iter: + yield x + + return generator() + except TypeError as exc: + print('nop') + raise TypeError('Could not apply bag to {}.'.format(func_or_iter)) from exc + + raise TypeError('Could not apply bag to {}.'.format(func_or_iter)) def extend(self, *args, **kwargs): return type(self)(*args, _parent=self, **kwargs) @@ -48,7 +71,13 @@ class Bag: def __repr__(self): return '<{} ({})>'.format( - type(self).__name__, ', '.join( - itertools.chain( - map(repr, self.args), - ('{}={}'.format(k, repr(v)) for k, v in self.kwargs.items()), ))) + type(self).__name__, ', '. + join(itertools.chain( + map(repr, self.args), + ('{}={}'.format(k, repr(v)) for k, v in self.kwargs.items()), + )) + ) + + +class ErrorBag(Bag): + pass diff --git a/bonobo/core/contexts.py b/bonobo/core/contexts.py index 6a4d7f0..8a66863 100644 --- a/bonobo/core/contexts.py +++ b/bonobo/core/contexts.py @@ -3,7 +3,7 @@ from functools import partial from queue import Empty from time import sleep -from bonobo.core.bags import Bag, INHERIT_INPUT +from bonobo.core.bags import Bag, INHERIT_INPUT, ErrorBag from bonobo.core.errors import InactiveReadableError from bonobo.core.inputs import Input from bonobo.core.stats import WithStatistics @@ -11,11 +11,31 @@ from bonobo.util.lifecycle import get_initializer, get_finalizer from bonobo.util.tokens import BEGIN, END, NEW, RUNNING, TERMINATED, NOT_MODIFIED +def get_name(mixed): + try: + return mixed.__name__ + except AttributeError: + return type(mixed).__name__ + + +def create_component_context(component, parent): + try: + CustomComponentContext = component.Context + except AttributeError: + return ComponentExecutionContext(component, parent=parent) + + if ComponentExecutionContext in CustomComponentContext.__mro__: + bases = (CustomComponentContext, ) + else: + bases = (CustomComponentContext, ComponentExecutionContext) + + return type(get_name(component).title() + 'ExecutionContext', bases, {})(component, parent=parent) + + class ExecutionContext: def __init__(self, graph, plugins=None): self.graph = graph - self.components = [ComponentExecutionContext(component, self) for component in self.graph.components] - + self.components = [create_component_context(component, parent=self) for component in self.graph.components] self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()] for i, component_context in enumerate(self): @@ -23,8 +43,10 @@ class ExecutionContext: component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)] except KeyError: continue + component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True) component_context.input.on_end = partial(component_context.send, END, _control=True) + component_context.input.on_finalize = partial(component_context.finalize) def __getitem__(self, item): return self.components[item] @@ -63,17 +85,19 @@ class AbstractLoopContext: def initialize(self): # pylint: disable=broad-except try: - get_initializer(self.wrapped)(self) + initializer = get_initializer(self.wrapped) except Exception as exc: self.handle_error(exc, traceback.format_exc()) + else: + return initializer(self) def loop(self): """Generic loop. A bit boring. """ while self.alive: - self._loop() + self.step() sleep(self.PERIOD) - def _loop(self): + def step(self): """ TODO xxx this is a step, not a loop """ @@ -83,9 +107,11 @@ class AbstractLoopContext: """Generic finalizer. """ # pylint: disable=broad-except try: - get_finalizer(self.wrapped)(self) + finalizer = get_finalizer(self.wrapped) except Exception as exc: - self.handle_error(exc, traceback.format_exc()) + return self.handle_error(exc, traceback.format_exc()) + else: + return finalizer(self) def handle_error(self, exc, trace): """ @@ -112,7 +138,7 @@ class PluginExecutionContext(AbstractLoopContext): def shutdown(self): self.alive = False - def _loop(self): + def step(self): try: self.wrapped.run(self) except Exception as exc: # pylint: disable=broad-except @@ -157,13 +183,17 @@ class ComponentExecutionContext(WithStatistics, AbstractLoopContext): return ( ( 'in', - self.stats['in'], ), + self.stats['in'], + ), ( 'out', - self.stats['out'], ), + self.stats['out'], + ), ( 'err', - self.stats['err'], ), ) + self.stats['err'], + ), + ) def recv(self, *messages): """ @@ -195,7 +225,7 @@ class ComponentExecutionContext(WithStatistics, AbstractLoopContext): self.stats['in'] += 1 return row - def _call(self, bag): + def apply_on(self, bag): # todo add timer if getattr(self.component, '_with_context', False): return bag.apply(self.component, self) @@ -203,10 +233,29 @@ class ComponentExecutionContext(WithStatistics, AbstractLoopContext): def initialize(self): # pylint: disable=broad-except - assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at ' - 'initialization time.').format(type(self).__name__, NEW) + assert self.state is NEW, ( + 'A {} can only be run once, and thus is expected to be in {} state at ' + 'initialization time.' + ).format(type(self).__name__, NEW) self.state = RUNNING - super().initialize() + + try: + initializer_outputs = super().initialize() + self.handle(None, initializer_outputs) + except Exception as exc: + self.handle_error(exc, traceback.format_exc()) + + def finalize(self): + # pylint: disable=broad-except + assert self.state is RUNNING, ('A {} must be in {} state at finalization time.' + ).format(type(self).__name__, RUNNING) + self.state = TERMINATED + + try: + finalizer_outputs = super().finalize() + self.handle(None, finalizer_outputs) + except Exception as exc: + self.handle_error(exc, traceback.format_exc()) def loop(self): while True: @@ -228,34 +277,39 @@ class ComponentExecutionContext(WithStatistics, AbstractLoopContext): output channel.""" input_bag = self.get() - outputs = self._call(input_bag) + outputs = self.apply_on(input_bag) + self.handle(input_bag, outputs) + def run(self): + self.initialize() + self.loop() + + def handle(self, input_bag, outputs): # self._exec_time += timer.duration # Put data onto output channels try: outputs = _iter(outputs) - except TypeError: + except TypeError: # not an iterator if outputs: - self.send(_resolve(input_bag, outputs)) + if isinstance(outputs, ErrorBag): + outputs.apply(self.handle_error) + else: + self.send(_resolve(input_bag, outputs)) else: # case with no result, an execution went through anyway, use for stats. # self._exec_count += 1 pass else: - while True: + while True: # iterator try: output = next(outputs) except StopIteration: break - self.send(_resolve(input_bag, output)) - - def finalize(self): - # pylint: disable=broad-except - assert self.state is RUNNING, ('A {} must be in {} state at finalization time.').format( - type(self).__name__, RUNNING) - self.state = TERMINATED - - super().finalize() + else: + if isinstance(output, ErrorBag): + output.apply(self.handle_error) + else: + self.send(_resolve(input_bag, output)) def _iter(mixed): diff --git a/bonobo/core/errors.py b/bonobo/core/errors.py index b82976d..7718de8 100644 --- a/bonobo/core/errors.py +++ b/bonobo/core/errors.py @@ -20,9 +20,12 @@ class AbstractError(NotImplementedError): """Abstract error is a convenient error to declare a method as "being left as an exercise for the reader".""" def __init__(self, method): - super().__init__('Call to abstract method {class_name}.{method_name}(...): missing implementation.'.format( - class_name=method.__self__.__name__, - method_name=method.__name__, )) + super().__init__( + 'Call to abstract method {class_name}.{method_name}(...): missing implementation.'.format( + class_name=method.__self__.__name__, + method_name=method.__name__, + ) + ) class InactiveIOError(IOError): @@ -39,9 +42,12 @@ class InactiveWritableError(InactiveIOError): class ValidationError(RuntimeError): def __init__(self, inst, message): - super(ValidationError, self).__init__('Validation error in {class_name}: {message}'.format( - class_name=type(inst).__name__, - message=message, )) + super(ValidationError, self).__init__( + 'Validation error in {class_name}: {message}'.format( + class_name=type(inst).__name__, + message=message, + ) + ) class ProhibitedOperationError(RuntimeError): diff --git a/bonobo/core/inputs.py b/bonobo/core/inputs.py index 1680e18..6598517 100644 --- a/bonobo/core/inputs.py +++ b/bonobo/core/inputs.py @@ -48,12 +48,17 @@ class Input(Queue, Readable, Writable): self._runlevel = 0 self._writable_runlevel = 0 + self.on_initialize = noop self.on_begin = noop self.on_end = noop + self.on_finalize = noop def put(self, data, block=True, timeout=None): # Begin token is a metadata to raise the input runlevel. if data == BEGIN: + if not self._runlevel: + self.on_initialize() + self._runlevel += 1 self._writable_runlevel += 1 @@ -78,14 +83,18 @@ class Input(Queue, Readable, Writable): data = Queue.get(self, block, timeout) if data == END: + if self._runlevel == 1: + self.on_finalize() + self._runlevel -= 1 # callback self.on_end() if not self.alive: - raise InactiveReadableError('Cannot get() on an inactive {} (runlevel just reached 0).'.format( - Readable.__name__)) + raise InactiveReadableError( + 'Cannot get() on an inactive {} (runlevel just reached 0).'.format(Readable.__name__) + ) return self.get(block, timeout) return data diff --git a/bonobo/core/services.py b/bonobo/core/services.py index 05d1724..8c53c6b 100644 --- a/bonobo/core/services.py +++ b/bonobo/core/services.py @@ -45,9 +45,11 @@ def inject(*iargs, **ikwargs): def wrapper(target): @functools.wraps(target) def wrapped(*args, **kwargs): - return target(*itertools.chain(map(resolve, iargs), args), - **{ ** kwargs, ** {k: resolve(v) - for k, v in ikwargs.items()}}) + return target( + *itertools.chain(map(resolve, iargs), args), + **{ ** kwargs, ** {k: resolve(v) + for k, v in ikwargs.items()}} + ) return wrapped diff --git a/bonobo/core/strategies/executor.py b/bonobo/core/strategies/executor.py index 038522f..804c7e2 100644 --- a/bonobo/core/strategies/executor.py +++ b/bonobo/core/strategies/executor.py @@ -2,10 +2,10 @@ import time from concurrent.futures import Executor from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor +from threading import Thread from bonobo.core.strategies.base import Strategy from bonobo.util.tokens import BEGIN, END - from ..bags import Bag @@ -48,3 +48,31 @@ class ThreadPoolExecutorStrategy(ExecutorStrategy): class ProcessPoolExecutorStrategy(ExecutorStrategy): executor_factory = ProcessPoolExecutor + + +class ThreadCollectionStrategy(Strategy): + def execute(self, graph, *args, plugins=None, **kwargs): + context = self.create_context(graph, plugins=plugins) + context.recv(BEGIN, Bag(), END) + + threads = [] + + # for plugin_context in context.plugins: + # threads.append(executor.submit(plugin_context.run)) + + for component_context in context.components: + thread = Thread(target=component_context.run) + threads.append(thread) + thread.start() + + # XXX TODO PLUGINS + while context.alive and len(threads): + time.sleep(0.1) + threads = list(filter(lambda thread: thread.is_alive, threads)) + + # for plugin_context in context.plugins: + # plugin_context.shutdown() + + # executor.shutdown() + + return context diff --git a/bonobo/ext/console/plugin.py b/bonobo/ext/console/plugin.py index e23beb7..699a6c5 100644 --- a/bonobo/ext/console/plugin.py +++ b/bonobo/ext/console/plugin.py @@ -53,9 +53,10 @@ class ConsoleOutputPlugin(Plugin): def _write(self, context, rewind): profile, debug = False, False if profile: - append = (('Memory', '{0:.2f} Mb'.format(memory_usage())), - # ('Total time', '{0} s'.format(execution_time(harness))), - ) + append = ( + ('Memory', '{0:.2f} Mb'.format(memory_usage())), + # ('Total time', '{0} s'.format(execution_time(harness))), + ) else: append = () self.write(context, prefix=self.prefix, append=append, debug=debug, profile=profile, rewind=rewind) @@ -77,25 +78,35 @@ class ConsoleOutputPlugin(Plugin): for i, component in enumerate(context): if component.alive: - _line = ''.join(( - t.black('({})'.format(i + 1)), - ' ', - t.bold(t.white('+')), - ' ', - component.name, - ' ', - component.get_stats_as_string( - debug=debug, profile=profile), - ' ', )) + _line = ''.join( + ( + t.black('({})'.format(i + 1)), + ' ', + t.bold(t.white('+')), + ' ', + component.name, + ' ', + component.get_stats_as_string( + debug=debug, profile=profile + ), + ' ', + ) + ) else: - _line = t.black(''.join(( - '({})'.format(i + 1), - ' - ', - component.name, - ' ', - component.get_stats_as_string( - debug=debug, profile=profile), - ' ', ))) + _line = t.black( + ''.join( + ( + '({})'.format(i + 1), + ' - ', + component.name, + ' ', + component.get_stats_as_string( + debug=debug, profile=profile + ), + ' ', + ) + ) + ) print(prefix + _line + t.clear_eol) if append: diff --git a/bonobo/ext/jupyter/plugin.py b/bonobo/ext/jupyter/plugin.py index 63483ce..cb598a3 100644 --- a/bonobo/ext/jupyter/plugin.py +++ b/bonobo/ext/jupyter/plugin.py @@ -6,9 +6,11 @@ try: except ImportError as e: import logging - logging.exception('You must install Jupyter to use the bonobo Jupyter extension. Easiest way is to install the ' - 'optional "jupyter" dependencies with «pip install bonobo[jupyter]», but you can also install a ' - 'specific version by yourself.') + logging.exception( + 'You must install Jupyter to use the bonobo Jupyter extension. Easiest way is to install the ' + 'optional "jupyter" dependencies with «pip install bonobo[jupyter]», but you can also install a ' + 'specific version by yourself.' + ) class JupyterOutputPlugin(Plugin): diff --git a/bonobo/ext/opendatasoft.py b/bonobo/ext/opendatasoft.py index 9fb8d61..f996801 100644 --- a/bonobo/ext/opendatasoft.py +++ b/bonobo/ext/opendatasoft.py @@ -3,17 +3,20 @@ from urllib.parse import urlencode import requests # todo: make this a service so we can substitute it ? -def from_opendatasoft_api(dataset=None, - endpoint='{scheme}://{netloc}{path}', - scheme='https', - netloc='data.opendatasoft.com', - path='/api/records/1.0/search/', - rows=100, - **kwargs): +def from_opendatasoft_api( + dataset=None, + endpoint='{scheme}://{netloc}{path}', + scheme='https', + netloc='data.opendatasoft.com', + path='/api/records/1.0/search/', + rows=100, + **kwargs +): path = path if path.startswith('/') else '/' + path params = ( ('dataset', dataset), - ('rows', rows), ) + tuple(sorted(kwargs.items())) + ('rows', rows), + ) + tuple(sorted(kwargs.items())) base_url = endpoint.format(scheme=scheme, netloc=netloc, path=path) + '?' + urlencode(params) def _extract_ods(): diff --git a/bonobo/io/csv.py b/bonobo/io/csv.py index aae58d5..72998d2 100644 --- a/bonobo/io/csv.py +++ b/bonobo/io/csv.py @@ -59,7 +59,8 @@ class CsvReader(CsvHandler, FileReader): if len(row) != field_count: raise ValueError('Got a line with %d fields, expecting %d.' % ( len(row), - field_count, )) + field_count, + )) yield dict(zip(headers, row)) diff --git a/bonobo/io/file.py b/bonobo/io/file.py index fa6374e..71be39e 100644 --- a/bonobo/io/file.py +++ b/bonobo/io/file.py @@ -36,6 +36,7 @@ class FileHandler: :param ctx: :return: """ + assert not hasattr(ctx, 'file'), 'A file pointer is already in the context... I do not know what to say...' ctx.file = self.open() @@ -95,8 +96,8 @@ class FileWriter(Writer): mode = 'w+' def initialize(self, ctx): - super().initialize(ctx) ctx.line = 0 + return super().initialize(ctx) def write(self, ctx, row): """ @@ -114,4 +115,4 @@ class FileWriter(Writer): def finalize(self, ctx): del ctx.line - super().finalize(ctx) + return super().finalize(ctx) diff --git a/bonobo/util/__init__.py b/bonobo/util/__init__.py index 4a15b40..9da50a9 100644 --- a/bonobo/util/__init__.py +++ b/bonobo/util/__init__.py @@ -1,47 +1,116 @@ """ Various simple utilities. """ import functools -import pprint +from pprint import pprint as _pprint + +import blessings -from .tokens import NOT_MODIFIED from .helpers import run, console_run, jupyter_run +from .tokens import NOT_MODIFIED __all__ = [ 'NOT_MODIFIED', 'console_run', - 'head', 'jupyter_run', + 'limit', 'log', 'noop', + 'pprint', 'run', 'tee', ] -def head(n=10): +def identity(x): + return x + + +def limit(n=10): i = 0 - def _head(x): + def _limit(*args, **kwargs): nonlocal i, n i += 1 if i <= n: - yield x + yield NOT_MODIFIED - _head.__name__ = 'head({})'.format(n) - return _head + _limit.__name__ = 'limit({})'.format(n) + return _limit def tee(f): @functools.wraps(f) - def wrapped(x): + def wrapped(*args, **kwargs): nonlocal f - f(x) - return x + f(*args, **kwargs) + return NOT_MODIFIED return wrapped -log = tee(pprint.pprint) +log = tee(_pprint) + + +def pprint(title_keys=('title', 'name', 'id'), print_values=True, sort=True): + term = blessings.Terminal() + + def _pprint(*args, **kwargs): + nonlocal title_keys, term, sort, print_values + + row = args[0] + for key in title_keys: + if key in row: + print(term.bold(row.get(key))) + break + + if print_values: + for k in sorted(row) if sort else row: + print( + ' • {t.blue}{k}{t.normal} : {t.black}({tp}){t.normal} {v}{t.clear_eol}'.format( + k=k, v=repr(row[k]), t=term, tp=type(row[k]).__name__ + ) + ) + + yield NOT_MODIFIED + + _pprint.__name__ = 'pprint' + + return _pprint + + +''' + + def writehr(self, label=None): + width = t.width or 80 + + if label: + label = str(label) + sys.stderr.write(t.black('·' * 4) + shade('{') + label + shade('}') + t.black('·' * (width - (6+len(label)) - 1)) + '\n') + else: + sys.stderr.write(t.black('·' * (width-1) + '\n')) + + + def writeln(self, s): + """Output method.""" + sys.stderr.write(self.format(s) + '\n') + + def initialize(self): + self.lineno = 0 + + def transform(self, hash, channel=STDIN): + """Actual transformation.""" + self.lineno += 1 + if not self.condition or self.condition(hash): + hash = hash.copy() + hash = hash if not isinstance(self.field_filter, collections.Callable) else hash.restrict(self.field_filter) + if self.clean: + hash = hash.restrict(lambda k: len(k) and k[0] != '_') + self.writehr(self.lineno) + self.writeln(hash) + self.writehr() + sys.stderr.write('\n') + yield hash +''' def noop(*args, **kwargs): # pylint: disable=unused-argument diff --git a/bonobo/util/helpers.py b/bonobo/util/helpers.py index 5e3538f..35b6459 100644 --- a/bonobo/util/helpers.py +++ b/bonobo/util/helpers.py @@ -1,20 +1,20 @@ -def run(*chain, plugins=None): +def run(*chain, plugins=None, strategy=None): from bonobo import Graph, ThreadPoolExecutorStrategy graph = Graph() graph.add_chain(*chain) - executor = ThreadPoolExecutorStrategy() + executor = (strategy or ThreadPoolExecutorStrategy)() return executor.execute(graph, plugins=plugins or []) -def console_run(*chain, output=True, plugins=None): +def console_run(*chain, output=True, plugins=None, strategy=None): from bonobo.ext.console import ConsoleOutputPlugin - return run(*chain, plugins=(plugins or []) + [ConsoleOutputPlugin()] if output else []) + return run(*chain, plugins=(plugins or []) + [ConsoleOutputPlugin()] if output else [], strategy=strategy) -def jupyter_run(*chain, plugins=None): +def jupyter_run(*chain, plugins=None, strategy=None): from bonobo.ext.jupyter import JupyterOutputPlugin - return run(*chain, plugins=(plugins or []) + [JupyterOutputPlugin()]) + return run(*chain, plugins=(plugins or []) + [JupyterOutputPlugin()], strategy=strategy) diff --git a/docs/conf.py b/docs/conf.py index 8fb96ed..68e9d7a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -153,8 +153,12 @@ man_pages = [(master_doc, 'bonobo', 'Bonobo Documentation', [author], 1)] # Grouping the document tree into Texinfo files. List of tuples # (source start file, target name, title, author, # dir menu entry, description, category) -texinfo_documents = [(master_doc, 'Bonobo', 'Bonobo Documentation', author, 'Bonobo', - 'One line description of project.', 'Miscellaneous'), ] +texinfo_documents = [ + ( + master_doc, 'Bonobo', 'Bonobo Documentation', author, 'Bonobo', 'One line description of project.', + 'Miscellaneous' + ), +] # -- Options for Epub output ---------------------------------------------- diff --git a/examples/opendata_fablabs.py b/examples/opendata_fablabs.py index b8a87d8..9dc7efa 100644 --- a/examples/opendata_fablabs.py +++ b/examples/opendata_fablabs.py @@ -40,10 +40,14 @@ def display(row): print(t.bold(row.get('name'))) address = list( - filter(None, ( - ' '.join(filter(None, (row.get('postal_code', None), row.get('city', None)))), - row.get('county', None), - row.get('country'), ))) + filter( + None, ( + ' '.join(filter(None, (row.get('postal_code', None), row.get('city', None)))), + row.get('county', None), + row.get('country'), + ) + ) + ) print(' - {}: {address}'.format(t.blue('address'), address=', '.join(address))) print(' - {}: {links}'.format(t.blue('links'), links=', '.join(row['links']))) @@ -54,9 +58,11 @@ def display(row): if __name__ == '__main__': console_run( from_opendatasoft_api( - API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris'), + API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris' + ), normalize, filter_france, tee(display), JsonWriter('fablabs.json'), - output=True, ) + output=True, + ) diff --git a/examples/read_cheap_coffeeshops_in_paris.py b/examples/read_cheap_coffeeshops_in_paris.py index 8e701e3..5c8f05c 100644 --- a/examples/read_cheap_coffeeshops_in_paris.py +++ b/examples/read_cheap_coffeeshops_in_paris.py @@ -8,8 +8,10 @@ OUTPUT_FILENAME = realpath(join(dirname(__file__), 'datasets/cheap_coffeeshops_i console_run( from_opendatasoft_api( - 'liste-des-cafes-a-un-euro', netloc='opendata.paris.fr'), + 'liste-des-cafes-a-un-euro', netloc='opendata.paris.fr' + ), lambda row: '{nom_du_cafe}, {adresse}, {arrondissement} Paris, France'.format(**row), - FileWriter(OUTPUT_FILENAME), ) + FileWriter(OUTPUT_FILENAME), +) print('Import done, read {} for results.'.format(OUTPUT_FILENAME)) diff --git a/examples/tutorial_basics_summary.py b/examples/tutorial_basics_summary.py index 60a5452..74a8f9a 100644 --- a/examples/tutorial_basics_summary.py +++ b/examples/tutorial_basics_summary.py @@ -9,7 +9,8 @@ def yield_from(*args): graph = Graph( lambda: (x for x in ('foo', 'bar', 'baz')), str.upper, - print, ) + print, +) # Use a thread pool. executor = ThreadPoolExecutorStrategy() diff --git a/setup.py b/setup.py index a19e5bd..9abfacf 100644 --- a/setup.py +++ b/setup.py @@ -33,16 +33,20 @@ setup( name='bonobo', description='Bonobo', license='Apache License, Version 2.0', - install_requires=['blessings >=1.6,<1.7', 'psutil >=5.0,<5.1'], + install_requires=['blessings >=1.6,<1.7', 'psutil >=5.0,<5.1', 'toolz >=0.8,<0.9'], version=version, long_description=read('README.rst'), classifiers=read('classifiers.txt', tolines), packages=find_packages(exclude=['ez_setup', 'example', 'test']), include_package_data=True, - data_files=[('share/jupyter/nbextensions/bonobo-jupyter', [ - 'bonobo/ext/jupyter/static/extension.js', 'bonobo/ext/jupyter/static/index.js', - 'bonobo/ext/jupyter/static/index.js.map' - ])], + data_files=[ + ( + 'share/jupyter/nbextensions/bonobo-jupyter', [ + 'bonobo/ext/jupyter/static/extension.js', 'bonobo/ext/jupyter/static/index.js', + 'bonobo/ext/jupyter/static/index.js.map' + ] + ) + ], extras_require={ 'dev': [ 'coverage >=4.2,<4.3', 'mock >=2.0,<2.1', 'nose >=1.3,<1.4', 'pylint >=1.6,<1.7', 'pytest >=3,<4', @@ -51,4 +55,5 @@ setup( 'jupyter': ['jupyter >=1.0,<1.1', 'ipywidgets >=6.0.0.beta5'] }, url='https://bonobo-project.org/', - download_url='https://github.com/python-bonobo/bonobo/tarball/{version}'.format(version=version), ) + download_url='https://github.com/python-bonobo/bonobo/tarball/{version}'.format(version=version), +) diff --git a/tests/core/test_bags.py b/tests/core/test_bags.py index 5998381..6c7bb62 100644 --- a/tests/core/test_bags.py +++ b/tests/core/test_bags.py @@ -5,7 +5,8 @@ from bonobo.core.bags import INHERIT_INPUT args = ( 'foo', - 'bar', ) + 'bar', +) kwargs = dict(acme='corp') @@ -40,13 +41,15 @@ def test_inherit(): assert bag2.args == ( 'a', - 'b', ) + 'b', + ) assert bag2.kwargs == {'a': 1, 'b': 2} assert INHERIT_INPUT in bag2.flags assert bag3.args == ( 'a', - 'c', ) + 'c', + ) assert bag3.kwargs == {'a': 1, 'c': 3} assert bag3.flags is () @@ -57,7 +60,8 @@ def test_inherit(): bag4.set_parent(bag) assert bag4.args == ( 'a', - 'd', ) + 'd', + ) assert bag4.kwargs == {'a': 1, 'd': 4} assert bag4.flags is () @@ -65,7 +69,8 @@ def test_inherit(): assert bag4.args == ( 'a', 'c', - 'd', ) + 'd', + ) assert bag4.kwargs == {'a': 1, 'c': 3, 'd': 4} assert bag4.flags is () diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py index 820d4da..dd0b002 100644 --- a/tests/core/test_stats.py +++ b/tests/core/test_stats.py @@ -5,7 +5,8 @@ class MyThingWithStats(WithStatistics): def get_stats(self, *args, **kwargs): return ( ('foo', 42), - ('bar', 69), ) + ('bar', 69), + ) def test_with_statistics(): diff --git a/tests/ext/test_ods.py b/tests/ext/test_ods.py index eef0db7..79786f2 100644 --- a/tests/ext/test_ods.py +++ b/tests/ext/test_ods.py @@ -19,17 +19,18 @@ class ResponseMock: def test_read_from_opendatasoft_api(): extract = from_opendatasoft_api('http://example.com/', 'test-a-set') with patch( - 'requests.get', return_value=ResponseMock([ - { - 'fields': { - 'foo': 'bar' - } - }, - { - 'fields': { - 'foo': 'zab' - } - }, - ])): + 'requests.get', return_value=ResponseMock([ + { + 'fields': { + 'foo': 'bar' + } + }, + { + 'fields': { + 'foo': 'zab' + } + }, + ]) + ): for line in extract(): assert 'foo' in line diff --git a/tests/io/test_file.py b/tests/io/test_file.py index 32566a5..3816233 100644 --- a/tests/io/test_file.py +++ b/tests/io/test_file.py @@ -11,7 +11,8 @@ from bonobo.util.tokens import BEGIN, END [ (('ACME', ), 'ACME'), # one line... (('Foo', 'Bar', 'Baz'), 'Foo\nBar\nBaz'), # more than one line... - ]) + ] +) def test_file_writer_in_context(tmpdir, lines, output): file = tmpdir.join('output.txt')