From efcd4361cc94582f775c3df1b51b2cb8caf2e4ce Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Tue, 25 Apr 2017 22:04:21 +0200 Subject: [PATCH 1/5] First implementation of services and basic injection. Not working with CLI for now. --- Projectfile | 7 +- bonobo/__init__.py | 20 ++-- bonobo/basics.py | 100 +++++++++++++++++ bonobo/config/__init__.py | 11 ++ bonobo/{config.py => config/configurables.py} | 25 +---- bonobo/config/options.py | 27 +++++ bonobo/{context => config}/processors.py | 30 +++--- bonobo/config/services.py | 77 +++++++++++++ bonobo/context/__init__.py | 6 -- bonobo/core/__init__.py | 11 -- bonobo/core/inputs.py | 2 +- bonobo/core/services.py | 54 ---------- bonobo/core/strategies/__init__.py | 0 bonobo/examples/datasets/fablabs.py | 3 +- bonobo/examples/datasets/fablabs.txt | 5 +- bonobo/examples/utils/count.py | 3 +- bonobo/{context => }/execution.py | 102 +++++++++--------- bonobo/ext/opendatasoft.py | 5 +- bonobo/io/csv.py | 2 +- bonobo/io/file.py | 6 +- bonobo/io/json.py | 2 +- bonobo/strategies/__init__.py | 8 ++ bonobo/{core => }/strategies/base.py | 2 +- bonobo/{core => }/strategies/executor.py | 6 +- bonobo/{core => }/strategies/naive.py | 2 +- bonobo/{core => }/strategies/util.py | 0 bonobo/util/__init__.py | 102 ------------------ bonobo/util/compat.py | 14 +++ bonobo/util/iterators.py | 6 ++ bonobo/util/lifecycle.py | 2 +- bonobo/util/testing.py | 2 +- docs/guide/services.rst | 74 +++++++++++++ docs/roadmap.rst | 37 +++++++ tests/context/test_processors.py | 5 +- tests/core/test_services.py | 24 ----- tests/io/test_csv.py | 2 +- tests/io/test_file.py | 8 +- tests/io/test_json.py | 4 +- tests/test_basicusage.py | 3 +- tests/test_config.py | 58 +++++++++- .../test_contexts.py => test_execution.py} | 5 +- 41 files changed, 538 insertions(+), 324 deletions(-) create mode 100644 bonobo/basics.py create mode 100644 bonobo/config/__init__.py rename bonobo/{config.py => config/configurables.py} (74%) create mode 100644 bonobo/config/options.py rename bonobo/{context => config}/processors.py (86%) create mode 100644 bonobo/config/services.py delete mode 100644 bonobo/context/__init__.py delete mode 100644 bonobo/core/services.py delete mode 100644 bonobo/core/strategies/__init__.py rename bonobo/{context => }/execution.py (91%) create mode 100644 bonobo/strategies/__init__.py rename bonobo/{core => }/strategies/base.py (86%) rename bonobo/{core => }/strategies/executor.py (90%) rename bonobo/{core => }/strategies/naive.py (89%) rename bonobo/{core => }/strategies/util.py (100%) create mode 100644 docs/guide/services.rst create mode 100644 docs/roadmap.rst delete mode 100644 tests/core/test_services.py rename tests/{core/test_contexts.py => test_execution.py} (89%) diff --git a/Projectfile b/Projectfile index c6705ee..73e5ba1 100644 --- a/Projectfile +++ b/Projectfile @@ -35,11 +35,10 @@ extras_require = { 'dev': [ 'coverage >=4.3,<4.4', 'mock >=2.0,<2.1', - 'nose >=1.3,<1.4', - 'pylint >=1.6,<1.7', + 'pylint >=1,<2', 'pytest >=3,<4', - 'pytest-cov >=2.4,<2.5', - 'pytest-timeout >=1.2,<1.3', + 'pytest-cov >=2,<3', + 'pytest-timeout >=1,<2', 'sphinx', 'sphinx_rtd_theme', 'yapf', diff --git a/bonobo/__init__.py b/bonobo/__init__.py index b128955..39f51a1 100644 --- a/bonobo/__init__.py +++ b/bonobo/__init__.py @@ -12,13 +12,13 @@ import warnings assert (sys.version_info >= (3, 5)), 'Python 3.5+ is required to use Bonobo.' from ._version import __version__ +from .basics import __all__ as __all_basics__ from .config import __all__ as __all_config__ -from .context import __all__ as __all_context__ -from .core import __all__ as __all_core__ +from .execution import __all__ as __all_execution__ from .io import __all__ as __all_io__ -from .util import __all__ as __all_util__ +from .strategies import __all__ as __all_strategies__ -__all__ = __all_config__ + __all_context__ + __all_core__ + __all_io__ + __all_util__ + [ +__all__ = __all_basics__ + __all_config__ + __all_execution__ + __all_io__ + __all_strategies__ + [ 'Bag', 'ErrorBag' 'Graph', @@ -29,14 +29,14 @@ __all__ = __all_config__ + __all_context__ + __all_core__ + __all_io__ + __all_u 'run', ] +from .basics import * from .config import * -from .context import * -from .core import * +from .execution import * from .io import * +from .strategies import * from .structs.bags import * from .structs.graphs import * from .structs.tokens import * -from .util import * DEFAULT_STRATEGY = 'threadpool' @@ -54,7 +54,7 @@ def get_examples_path(*pathsegments): def create_strategy(name=None): - from bonobo.core.strategies.base import Strategy + from bonobo.strategies.base import Strategy import logging if isinstance(name, Strategy): @@ -87,7 +87,7 @@ def _is_jupyter_notebook(): return False -def run(graph, *chain, strategy=None, plugins=None): +def run(graph, *chain, strategy=None, plugins=None, services=None): if len(chain): warnings.warn('DEPRECATED. You should pass a Graph instance instead of a chain.') from bonobo import Graph @@ -106,7 +106,7 @@ def run(graph, *chain, strategy=None, plugins=None): if JupyterOutputPlugin not in plugins: plugins.append(JupyterOutputPlugin) - return strategy.execute(graph, plugins=plugins) + return strategy.execute(graph, plugins=plugins, services=services) del sys diff --git a/bonobo/basics.py b/bonobo/basics.py new file mode 100644 index 0000000..1eb8065 --- /dev/null +++ b/bonobo/basics.py @@ -0,0 +1,100 @@ +import functools +from pprint import pprint as _pprint + +from colorama import Fore, Style + +from bonobo.config.processors import contextual +from bonobo.constants import NOT_MODIFIED +from bonobo.structs.bags import Bag +from bonobo.util.objects import ValueHolder +from bonobo.util.term import CLEAR_EOL + +__all__ = [ + 'identity', + 'Limit', + 'Tee', + 'count', + 'pprint', + 'PrettyPrint', + 'noop', +] + +def identity(x): + return x + + +def Limit(n=10): + i = 0 + + def _limit(*args, **kwargs): + nonlocal i, n + i += 1 + if i <= n: + yield NOT_MODIFIED + + _limit.__name__ = 'Limit({})'.format(n) + return _limit + + +def Tee(f): + @functools.wraps(f) + def wrapped(*args, **kwargs): + nonlocal f + f(*args, **kwargs) + return NOT_MODIFIED + + return wrapped + + +@contextual +def count(counter, *args, **kwargs): + counter += 1 + + +@count.add_context_processor +def _count_counter(self, context): + counter = ValueHolder(0) + yield counter + context.send(Bag(counter.value)) + + +pprint = Tee(_pprint) + + +def PrettyPrint(title_keys=('title', 'name', 'id'), print_values=True, sort=True): + def _pprint(*args, **kwargs): + nonlocal title_keys, sort, print_values + + row = args[0] + for key in title_keys: + if key in row: + print(Style.BRIGHT, row.get(key), Style.RESET_ALL, sep='') + break + + if print_values: + for k in sorted(row) if sort else row: + print( + ' • ', + Fore.BLUE, + k, + Style.RESET_ALL, + ' : ', + Fore.BLACK, + '(', + type(row[k]).__name__, + ')', + Style.RESET_ALL, + ' ', + repr(row[k]), + CLEAR_EOL, + ) + + yield NOT_MODIFIED + + _pprint.__name__ = 'pprint' + + return _pprint + + +def noop(*args, **kwargs): # pylint: disable=unused-argument + return NOT_MODIFIED diff --git a/bonobo/config/__init__.py b/bonobo/config/__init__.py new file mode 100644 index 0000000..8f5ecc2 --- /dev/null +++ b/bonobo/config/__init__.py @@ -0,0 +1,11 @@ +from bonobo.config.configurables import Configurable +from bonobo.config.options import Option +from bonobo.config.services import Container, Service +from bonobo.config.processors import ContextProcessor + +__all__ = [ + 'Configurable', + 'Container', + 'Option', + 'Service', +] diff --git a/bonobo/config.py b/bonobo/config/configurables.py similarity index 74% rename from bonobo/config.py rename to bonobo/config/configurables.py index 4456e0d..d5bdeb3 100644 --- a/bonobo/config.py +++ b/bonobo/config/configurables.py @@ -1,26 +1,11 @@ -__all__ = [ - 'Configurable', - 'Option', -] - - -class Option: - def __init__(self, type=None, *, required=False, default=None): - self.name = None - self.type = type - self.required = required - self.default = default - - def __get__(self, inst, typ): - if not self.name in inst.__options_values__: - inst.__options_values__[self.name] = self.default() if callable(self.default) else self.default - return inst.__options_values__[self.name] - - def __set__(self, inst, value): - inst.__options_values__[self.name] = self.type(value) if self.type else value +from bonobo.config.options import Option class ConfigurableMeta(type): + """ + Metaclass for Configurables that will add options to a special __options__ dict. + """ + def __init__(cls, what, bases=None, dict=None): super().__init__(what, bases, dict) cls.__options__ = {} diff --git a/bonobo/config/options.py b/bonobo/config/options.py new file mode 100644 index 0000000..4a7904b --- /dev/null +++ b/bonobo/config/options.py @@ -0,0 +1,27 @@ +class Option: + """ + An Option is a descriptor for a required or optional parameter of a Configurable. + + """ + _creation_counter = 0 + + def __init__(self, type=None, *, required=False, default=None): + self.name = None + self.type = type + self.required = required + self.default = default + + # This hack is necessary for python3.5 + self._creation_counter = Option._creation_counter + Option._creation_counter += 1 + + def get_default(self): + return self.default() if callable(self.default) else self.default + + def __get__(self, inst, typ): + if not self.name in inst.__options_values__: + inst.__options_values__[self.name] = self.get_default() + return inst.__options_values__[self.name] + + def __set__(self, inst, value): + inst.__options_values__[self.name] = self.type(value) if self.type else value \ No newline at end of file diff --git a/bonobo/context/processors.py b/bonobo/config/processors.py similarity index 86% rename from bonobo/context/processors.py rename to bonobo/config/processors.py index 76ef4eb..64a4bee 100644 --- a/bonobo/context/processors.py +++ b/bonobo/config/processors.py @@ -1,20 +1,12 @@ -from functools import partial +import functools import types +from bonobo.util.compat import deprecated_alias + _CONTEXT_PROCESSORS_ATTR = '__processors__' -def get_context_processors(mixed): - if isinstance(mixed, types.FunctionType): - yield from getattr(mixed, _CONTEXT_PROCESSORS_ATTR, ()) - - for cls in reversed((mixed if isinstance(mixed, type) else type(mixed)).__mro__): - yield from cls.__dict__.get(_CONTEXT_PROCESSORS_ATTR, ()) - - return () - - class ContextProcessor: _creation_counter = 0 @@ -47,7 +39,7 @@ def contextual(cls_or_func): :param cls_or_func: """ if not add_context_processor.__name__ in cls_or_func.__dict__: - setattr(cls_or_func, add_context_processor.__name__, partial(add_context_processor, cls_or_func)) + setattr(cls_or_func, add_context_processor.__name__, functools.partial(add_context_processor, cls_or_func)) if isinstance(cls_or_func, types.FunctionType): try: @@ -63,6 +55,20 @@ def contextual(cls_or_func): for name, value in cls_or_func.__dict__.items(): if isinstance(value, ContextProcessor): _processors.append(value) + # This is needed for python 3.5, python 3.6 should be fine, but it's considered an implementation detail. _processors.sort(key=lambda proc: proc._creation_counter) return cls_or_func + + +def resolve_processors(mixed): + if isinstance(mixed, types.FunctionType): + yield from getattr(mixed, _CONTEXT_PROCESSORS_ATTR, ()) + + for cls in reversed((mixed if isinstance(mixed, type) else type(mixed)).__mro__): + yield from cls.__dict__.get(_CONTEXT_PROCESSORS_ATTR, ()) + + return () + + +get_context_processors = deprecated_alias('get_context_processors', resolve_processors) diff --git a/bonobo/config/services.py b/bonobo/config/services.py new file mode 100644 index 0000000..e2dfc5c --- /dev/null +++ b/bonobo/config/services.py @@ -0,0 +1,77 @@ +import re + +from bonobo.config.options import Option + +_service_name_re = re.compile(r"^[^\d\W]\w*(:?\.[^\d\W]\w*)*$", re.UNICODE) + + +def validate_service_name(name): + if not _service_name_re.match(name): + raise ValueError('Invalid service name {!r}.'.format(name)) + return name + + +class Service(Option): + """ + A Service is a special kind of option defining a dependency to something that will be resolved at runtime, using an + identifier. For example, you can create a Configurable that has a "database" Service in its attribute, meaning that + you'll define which database to use, by name, when creating the instance of this class, then provide an + implementation when running the graph using a strategy. + + Example:: + + import bonobo + + class QueryExtractor(bonobo.Configurable): + database = bonobo.Service(default='sqlalchemy.engine.default') + + graph = bonobo.Graph( + QueryExtractor(database='sqlalchemy.engine.secondary'), + *more_transformations, + ) + + if __name__ == '__main__': + engine = create_engine('... dsn ...') + bonobo.run(graph, services={ + 'sqlalchemy.engine.secondary': engine + }) + + The main goal is not to tie transformations to actual dependencies, so the same can be run in different contexts + (stages like preprod, prod, or tenants like client1, client2, or anything you want). + + """ + + def __init__(self, type=None, *, required=False, default=None): + super().__init__(type, required=required, default=default) + + def __set__(self, inst, value): + inst.__options_values__[self.name] = validate_service_name(value) + + def resolve(self, inst, services): + name = getattr(inst, self.name) + if not name in services: + raise KeyError('Cannot resolve service {!r} using provided service collection.'.format(name)) + return services.get(name) + + +class Container(dict): + def __new__(cls, *args, **kwargs): + if len(args) == 1: + assert not len(kwargs), 'only one usage at a time, my dear.' + if not(args[0]): + return super().__new__(cls) + if isinstance(args[0], cls): + return cls + return super().__new__(cls, *args, **kwargs) + + def args_for(self, mixed): + try: + options = mixed.__options__ + except AttributeError: + options = {} + + return tuple( + option.resolve(mixed, self) + for name, option in options.items() + if isinstance(option, Service) + ) diff --git a/bonobo/context/__init__.py b/bonobo/context/__init__.py deleted file mode 100644 index 2300929..0000000 --- a/bonobo/context/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -from bonobo.context.processors import contextual, ContextProcessor - -__all__ = [ - 'ContextProcessor', - 'contextual', -] diff --git a/bonobo/core/__init__.py b/bonobo/core/__init__.py index 287e62c..f5f8416 100644 --- a/bonobo/core/__init__.py +++ b/bonobo/core/__init__.py @@ -1,13 +1,2 @@ """ Core required libraries. """ -from .services import inject, service -from .strategies.executor import ThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy -from .strategies.naive import NaiveStrategy - -__all__ = [ - 'NaiveStrategy', - 'ProcessPoolExecutorStrategy', - 'ThreadPoolExecutorStrategy', - 'inject', - 'service', -] diff --git a/bonobo/core/inputs.py b/bonobo/core/inputs.py index 3c6b481..f41ff01 100644 --- a/bonobo/core/inputs.py +++ b/bonobo/core/inputs.py @@ -19,7 +19,7 @@ from queue import Queue from bonobo.errors import AbstractError, InactiveWritableError, InactiveReadableError from bonobo.constants import BEGIN, END -from bonobo.util import noop +from bonobo.basics import noop BUFFER_SIZE = 8192 diff --git a/bonobo/core/services.py b/bonobo/core/services.py deleted file mode 100644 index ce0c0c3..0000000 --- a/bonobo/core/services.py +++ /dev/null @@ -1,54 +0,0 @@ -import functools -import itertools - - -class service: - def __init__(self, factory): - self.factory = factory - self.instance = None - # self.__call__ = functools.wraps(self.__call__) - - self.children = set() - - def __call__(self, *args, **kwargs): - if self.instance is None: - self.instance = self.factory(*args, **kwargs) - return self.instance - - def __getitem__(self, item): - if item not in self.children: - raise KeyError(item) - return item - - def define(self, *args, **kwargs): - new_service = type(self)(functools.partial(self.factory, *args, **kwargs)) - self.children.add(new_service) - return new_service - - -call = lambda s: s() - - -def resolve(func): - return func() - - -def inject(*iargs, **ikwargs): - """ - Inject service dependencies. - - TODO: ikwargs are ignored, implement that - """ - - def wrapper(target): - @functools.wraps(target) - def wrapped(*args, **kwargs): - return target( - *itertools.chain(map(resolve, iargs), args), - **{ ** kwargs, ** {k: resolve(v) - for k, v in ikwargs.items()}} - ) - - return wrapped - - return wrapper diff --git a/bonobo/core/strategies/__init__.py b/bonobo/core/strategies/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/bonobo/examples/datasets/fablabs.py b/bonobo/examples/datasets/fablabs.py index 1d3eeaf..80f6e29 100644 --- a/bonobo/examples/datasets/fablabs.py +++ b/bonobo/examples/datasets/fablabs.py @@ -1,7 +1,8 @@ import json import os -from bonobo import Tee, JsonWriter, Graph, get_examples_path +from bonobo import JsonWriter, Graph, get_examples_path +from bonobo.basics import Tee from bonobo.ext.opendatasoft import OpenDataSoftAPI from colorama import Fore, Style diff --git a/bonobo/examples/datasets/fablabs.txt b/bonobo/examples/datasets/fablabs.txt index be1bc7b..bf5cc4a 100644 --- a/bonobo/examples/datasets/fablabs.txt +++ b/bonobo/examples/datasets/fablabs.txt @@ -130,4 +130,7 @@ {"city": "Clermont-Ferrand", "kind_name": "mini_fab_lab", "links": ["http://acolab.fr"], "capabilities": "three_d_printing;circuit_production;vinyl_cutting", "url": "https://www.fablabs.io/labs/acolab", "coordinates": [45.7941993299, 3.07563051059], "name": "ACoLab", "phone": "+33(0)651800518", "avatar_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/11/15/48/fd42c5cd-21ac-4abf-9a20-8f9bb602c7b1/ACoLab.jpg", "postal_code": "63000", "longitude": 3.07563051058958, "country_code": "fr", "latitude": 45.7941993298608, "address_1": "2 bis rue du Clos Perret", "address_notes": "Au quatri\u00e8me \u00e9tage du b\u00e2timent, entr\u00e9e par le 2bis rue du Clos Perret\r\n\r\nAdresse 'historique' (2013/Mai2015), chez les Petits D\u00e9brouillards d'Auvergne : 32 Rue du Pont Naturel, 63000 Clermont-Ferrand\r\nIl faut traverser la petite place entre les immeubles et descendre quelques marches.", "email": "contact@acolab.fr", "blurb": "Atelier Collaboratif - Ouvert les lundi et mercredi soir", "description": "FabLab associatif cr\u00e9e en 2013\r\n\u00c9quip\u00e9 d'une d\u00e9coupeuse vinyle, d'une imprimante 3D type Mendel Max, d'un petit tour \u00e0 m\u00e9taux, utilisation d'Arduino, de Raspberry Pi...\r\n\r\nBeaucoup de r\u00e9cup\u00e9ration et de bidouillages vari\u00e9s dans la bonne humeur et le partage.", "geometry": {"type": "Point", "coordinates": [3.07563051059, 45.7941993299]}, "country": "France"}, {"city": "brest", "kind_name": "fab_lab", "links": ["http://wiki.lesfabriquesduponant.net", "http://www.lesfabriquesduponant.net"], "url": "https://www.fablabs.io/labs/lesfabriquesduponant", "name": "Les Fabriques du Ponant", "longitude": -4.47982980000006, "header_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/13/48/32/6d2e62f0-0f08-424a-883e-b9a15e90ee8a/Les Fabriques du Ponant.jpg", "phone": "+33.685176295", "avatar_url": "http://fablabs.io.s3.amazonaws.com/2017/02/08/10/11/15/20e09c48-5ac6-40fc-8462-bce909c24de0/531px-Logofabdupo.png", "postal_code": "29200", "coordinates": [48.4086189, -4.4798298], "country_code": "fr", "latitude": 48.4086189, "address_1": "40, rue Jules Lesven", "capabilities": "three_d_printing;cnc_milling;circuit_production;laser;vinyl_cutting", "email": "contact@lesfabriquesduponant.net", "blurb": "\"Les Fabrique du Ponant\" is run by \"T\u00e9l\u00e9com Bretagne\" and \"Les petits d\u00e9brouillards\". Its main goal is to propose digital manufacturing services, organise digital cultural events and digital education", "description": "Installed in high school Vauban in Brest, \"Les Fabrique du Ponant\" (which can be translate in \"Factories Ponant\") offer a coworking space, a fully equipped fablab, a webTV studio, a training room. \"Les Fabrique du Ponant\" organize demonstrations (initiation days and discovery), cultural events on digital as the \"Open Bidouille Camp\" or \"Science Hack Day\", trainings, educational activities.", "geometry": {"type": "Point", "coordinates": [-4.4798298, 48.4086189]}, "country": "France"}, {"city": "Tours", "coordinates": [47.3932037, 0.6687421], "kind_name": "mini_fab_lab", "links": ["http://funlab.fr"], "url": "https://www.fablabs.io/labs/funlab", "name": "FunLab Tours", "longitude": 0.668742100000031, "header_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/13/50/06/7863f4ba-28b3-4018-9351-c1d8c70a5b69/FunLab Tours.jpg", "phone": "+33603951216", "avatar_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/11/13/52/7d30f2aa-d5b7-482a-8334-a72d17e0a6fe/FunLab Tours.jpg", "postal_code": "37000", "capabilities": "three_d_printing;cnc_milling;laser;vinyl_cutting", "country_code": "fr", "latitude": 47.3932037, "address_1": "49, boulevard Preuilly", "address_notes": "Nous sommes occupants de site MAME \"cit\u00e9 de la cr\u00e9ation et du num\u00e9rique\"", "email": "contact@funlab.fr", "blurb": "Fabrique d'Usages Num\u00e9riques", "description": "La communaut\u00e9 existe, des rencontres toutes les semaines. 49, Boulevard Preuilly, 37000 Tours.", "geometry": {"type": "Point", "coordinates": [0.6687421, 47.3932037]}, "country": "France"}, -{"city": "Bron", "kind_name": "fab_lab", "links": ["http://fablab-lyon.fr"], "capabilities": "three_d_printing;cnc_milling;laser;vinyl_cutting", "url": "https://www.fablabs.io/labs/fabriquedobjetslibres", "name": "Fabrique d'Objets Libres", "email": "contact@fabriquedobjetslibres.fr", "coordinates": [45.7429334, 4.9082135], "header_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/13/50/01/0190e790-aaec-4f2f-9985-11156655145d/Fabrique d'Objets Libres.jpg", "county": "Rh\u00f4ne", "phone": "+33 7 68 01 40 26 (Tue-Sat 2pm-6pm)", "avatar_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/11/13/49/73ea9f2d-0216-4f52-a6bf-2ff97ee474b2/Fabrique d'Objets Libres.jpg", "postal_code": "69500", "longitude": 4.90821349999999, "country_code": "fr", "latitude": 45.7429334, "address_1": "All\u00e9e Gaillard Romanet", "address_notes": "Au sous-sol de la MJC. Downstairs inside the MJC.", "address_2": "MJC Louis Aragon", "blurb": "Le fablab lyonnais, install\u00e9 \u00e0 la MJC Louis Aragon de Bron, ouvert tous les mercredis et formation hebdomadaire de fabrication num\u00e9rique. Projets autour du handicap, des arts et du recyclage.", "description": "La Fabrique d'Objets Libres est un fablab associatif sur Lyon et sa r\u00e9gion. Install\u00e9 \u00e0 la MJC Louis Aragon de Bron depuis janvier 2013, c'est un espace de cr\u00e9ation et de fabrication num\u00e9rique ouvert \u00e0 tous, qui permet \u00e0 chacun de d\u00e9couvrir, d'inventer et de fabriquer tout type d'objet.\r\n \r\nV\u00e9ritable laboratoire citoyen de fabrication, la Fabrique d\u2019Objets Libres met \u00e0 disposition de ses adh\u00e9rents des outils \u00e0 commande num\u00e9rique et des mati\u00e8res premi\u00e8res secondaires permettant de concevoir et de fabriquer localement des objets libres.\r\nC\u2019est une plate-forme pluridisciplinaire collaborative qui m\u00eale les profils (techniciens, informaticiens, ing\u00e9nieurs, scientifiques, bricoleurs, cr\u00e9ateurs...) et les g\u00e9n\u00e9rations afin de r\u00e9unir tous types de comp\u00e9tences.\r\n\r\nLe fablab est ouvert tous les mercredis pour les \"temps libres\", durant lesquels les adh\u00e9rents utilisent les machines librement. Par ailleurs, il propose un atelier hebdomadaire aux adh\u00e9rents de la MJC, \"De l'id\u00e9e \u00e0 l'objet\": en une dizaine de s\u00e9ances sur un trimestre, les participants apprennent \u00e0 utiliser toutes les machines du fablab pour r\u00e9aliser leurs objets, et r\u00e9fl\u00e9chissent autour d'une th\u00e9matique sociale comme le handicap, la musique, ou la ville.\r\n\r\nL'association organise \u00e9galement des \u00e9v\u00e9nements et ateliers th\u00e9matiques utilisant la fabrication num\u00e9rique autour de sujet plus vastes, comme l'art, avec les machines \u00e0 dessiner, ou le handicap, dans le cadre du projet Handilab, ou encore la fin de vie des objets, avec le Laboratoire de l'Obsolescence D\u00e9programm\u00e9e. Enfin, le fablab s'associe \u00e0 d'autres associations et des entreprises pour des projets communs.", "geometry": {"type": "Point", "coordinates": [4.9082135, 45.7429334]}, "country": "France"} \ No newline at end of file +{"city": "Bron", "kind_name": "fab_lab", "links": ["http://fablab-lyon.fr"], "capabilities": "three_d_printing;cnc_milling;laser;vinyl_cutting", "url": "https://www.fablabs.io/labs/fabriquedobjetslibres", "name": "Fabrique d'Objets Libres", "email": "contact@fabriquedobjetslibres.fr", "coordinates": [45.7429334, 4.9082135], "header_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/13/50/01/0190e790-aaec-4f2f-9985-11156655145d/Fabrique d'Objets Libres.jpg", "county": "Rh\u00f4ne", "phone": "+33 7 68 01 40 26 (Tue-Sat 2pm-6pm)", "avatar_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/11/13/49/73ea9f2d-0216-4f52-a6bf-2ff97ee474b2/Fabrique d'Objets Libres.jpg", "postal_code": "69500", "longitude": 4.90821349999999, "country_code": "fr", "latitude": 45.7429334, "address_1": "All\u00e9e Gaillard Romanet", "address_notes": "Au sous-sol de la MJC. Downstairs inside the MJC.", "address_2": "MJC Louis Aragon", "blurb": "Le fablab lyonnais, install\u00e9 \u00e0 la MJC Louis Aragon de Bron, ouvert tous les mercredis et formation hebdomadaire de fabrication num\u00e9rique. Projets autour du handicap, des arts et du recyclage.", "description": "La Fabrique d'Objets Libres est un fablab associatif sur Lyon et sa r\u00e9gion. Install\u00e9 \u00e0 la MJC Louis Aragon de Bron depuis janvier 2013, c'est un espace de cr\u00e9ation et de fabrication num\u00e9rique ouvert \u00e0 tous, qui permet \u00e0 chacun de d\u00e9couvrir, d'inventer et de fabriquer tout type d'objet.\r\n \r\nV\u00e9ritable laboratoire citoyen de fabrication, la Fabrique d\u2019Objets Libres met \u00e0 disposition de ses adh\u00e9rents des outils \u00e0 commande num\u00e9rique et des mati\u00e8res premi\u00e8res secondaires permettant de concevoir et de fabriquer localement des objets libres.\r\nC\u2019est une plate-forme pluridisciplinaire collaborative qui m\u00eale les profils (techniciens, informaticiens, ing\u00e9nieurs, scientifiques, bricoleurs, cr\u00e9ateurs...) et les g\u00e9n\u00e9rations afin de r\u00e9unir tous types de comp\u00e9tences.\r\n\r\nLe fablab est ouvert tous les mercredis pour les \"temps libres\", durant lesquels les adh\u00e9rents utilisent les machines librement. Par ailleurs, il propose un atelier hebdomadaire aux adh\u00e9rents de la MJC, \"De l'id\u00e9e \u00e0 l'objet\": en une dizaine de s\u00e9ances sur un trimestre, les participants apprennent \u00e0 utiliser toutes les machines du fablab pour r\u00e9aliser leurs objets, et r\u00e9fl\u00e9chissent autour d'une th\u00e9matique sociale comme le handicap, la musique, ou la ville.\r\n\r\nL'association organise \u00e9galement des \u00e9v\u00e9nements et ateliers th\u00e9matiques utilisant la fabrication num\u00e9rique autour de sujet plus vastes, comme l'art, avec les machines \u00e0 dessiner, ou le handicap, dans le cadre du projet Handilab, ou encore la fin de vie des objets, avec le Laboratoire de l'Obsolescence D\u00e9programm\u00e9e. Enfin, le fablab s'associe \u00e0 d'autres associations et des entreprises pour des projets communs.", "geometry": {"type": "Point", "coordinates": [4.9082135, 45.7429334]}, "country": "France"}, +{"city": "N\u00e9ons-sur-Creuse", "kind_name": "fab_lab", "links": ["http://www.rurallab.org"], "url": "https://www.fablabs.io/labs/rurallab", "coordinates": [46.744746, 0.931698], "header_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/13/49/00/95c7b9f2-a034-4b2b-931d-43ced33ddfb1/RuralLab.jpg", "phone": "+33603318810", "avatar_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/11/12/49/ec5f7c54-e6ce-40fd-b5c5-c4142d208e6b/RuralLab.jpg", "postal_code": "36220", "longitude": 0.931697999999983, "country_code": "fr", "latitude": 46.744746, "address_1": "Rue de l'\u00c9cole", "email": "rurallab36@gmail.com", "blurb": "A FabLab in the countryside in Neons sur Creuse, France", "name": "RuralLab", "geometry": {"type": "Point", "coordinates": [0.931698, 46.744746]}, "country": "France"}, +{"city": "Gif-sur-Yvette", "kind_name": "supernode", "links": ["http://fablab.digiscope.fr/#!/", "http://fablabdigiscope.wordpress.com"], "url": "https://www.fablabs.io/labs/fablabdigiscope", "name": "(Fab)Lab Digiscope", "longitude": 2.16830979999997, "header_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/13/52/18/8d63351d-c2fb-4a90-8e58-bb45422202a6/(Fab)Lab Digiscope.jpg", "avatar_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/11/15/46/51553da4-b295-426c-837f-934c311933ba/(Fab)Lab Digiscope.jpg", "postal_code": "91190", "coordinates": [48.7117632, 2.1683098], "country_code": "fr", "latitude": 48.7117632, "address_1": "660 Rue Noetzlin", "capabilities": "three_d_printing;cnc_milling;circuit_production;laser;precision_milling;vinyl_cutting", "email": "fablabdigiscope@gmail.com", "blurb": "(FAB)LAB DIGISCOPE is a fabrication laboratory dedicated to research in sciences | design | education | art | engineering and what ever field of research you come from. Open to Everyone. Book now!", "description": "(FAB)LAB DIGISCOPE is a fabrication laboratory dedicated to research in sciences | design | education | arts | engineering and what ever field of research you come from. We host Fab Academy and Bio Academy. We host Digital Fabrication Classes for EITC Master. Open to Everyone since the beginning.\r\n\r\nFablab Digiscope started in 2013 when Aviz-INRIA research team director Jean-Daniel Fekete and colleague researcher Pierre Dragicevic hired Romain Di Vozzo as a R&D Engineer to be the fablab manager of what would later become an attractive place on the new Campus Paris-Saclay. Fablab Digiscope is part of the Digiscope Project, a network of 10 high-performance platforms for interactive visualization of large datasets and complex computation for which Michel Beaudouin-Lafon is the scientific Director. Fablab Digiscope is mutualised between 10 institutions involved in research and education.\r\n\r\nRomain Di Vozzo runs and develops Fablab Digiscope everyday, trains publics, designs objects, shares creative thoughts, gives advices on designs, etc. Romain also actively collaborates to the globally distributed fablab network and with the Fab Foundation by operating as Fab Academy SuperNode, as Instructor for Fab Academy and Bio Academy, by giving conferences and workshops in France and abroad and by performing very small tasks that make the fablab network grow.", "geometry": {"type": "Point", "coordinates": [2.1683098, 48.7117632]}, "country": "France"}, +{"city": "Metz", "kind_name": "fab_lab", "links": ["http://graoulab.org/wiki", "http://graoulab.org"], "url": "https://www.fablabs.io/labs/graoulab", "coordinates": [49.1262692, 6.182086], "name": "GraouLab", "avatar_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/11/18/24/af4709d8-1f60-48a7-ba35-4c42ef40a195/GraouLab.jpg", "postal_code": "57000", "longitude": 6.18208600000003, "country_code": "fr", "latitude": 49.1262692, "capabilities": "three_d_printing;laser", "email": "contact@graoulab.org", "blurb": "The FabLab of Metz. A place for folks innovation.", "address_1": "7 Avenue de Blida", "geometry": {"type": "Point", "coordinates": [6.182086, 49.1262692]}, "country": "France"} \ No newline at end of file diff --git a/bonobo/examples/utils/count.py b/bonobo/examples/utils/count.py index d1bd12f..fdcc3bf 100644 --- a/bonobo/examples/utils/count.py +++ b/bonobo/examples/utils/count.py @@ -1,6 +1,7 @@ import bonobo +import bonobo.basics -graph = bonobo.Graph(range(42), bonobo.count, print) +graph = bonobo.Graph(range(42), bonobo.basics.count, print) if __name__ == '__main__': bonobo.run(graph) diff --git a/bonobo/context/execution.py b/bonobo/execution.py similarity index 91% rename from bonobo/context/execution.py rename to bonobo/execution.py index 750dd56..c23fe8c 100644 --- a/bonobo/context/execution.py +++ b/bonobo/execution.py @@ -1,17 +1,25 @@ -import traceback import sys +import traceback from functools import partial from queue import Empty from time import sleep -from bonobo.constants import BEGIN, END, NOT_MODIFIED, INHERIT_INPUT -from bonobo.context.processors import get_context_processors +from bonobo.config import Container +from bonobo.config.processors import resolve_processors +from bonobo.constants import BEGIN, END, INHERIT_INPUT, NOT_MODIFIED from bonobo.core.inputs import Input from bonobo.core.statistics import WithStatistics from bonobo.errors import InactiveReadableError from bonobo.structs.bags import Bag, ErrorBag +from bonobo.util.iterators import ensure_tuple from bonobo.util.objects import Wrapper +__all__ = [ + 'GraphExecutionContext', + 'NodeExecutionContext', + 'PluginExecutionContext', +] + class GraphExecutionContext: @property @@ -26,10 +34,11 @@ class GraphExecutionContext: def alive(self): return any(node.alive for node in self.nodes) - def __init__(self, graph, plugins=None): + def __init__(self, graph, plugins=None, services=None): self.graph = graph self.nodes = [NodeExecutionContext(node, parent=self) for node in self.graph.nodes] self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()] + self.services = Container(services) if services else Container() for i, component_context in enumerate(self): try: @@ -74,12 +83,6 @@ class GraphExecutionContext: node.stop() -def ensure_tuple(tuple_or_mixed): - if isinstance(tuple_or_mixed, tuple): - return tuple_or_mixed - return (tuple_or_mixed, ) - - class LoopingExecutionContext(Wrapper): alive = True PERIOD = 0.25 @@ -105,13 +108,16 @@ class LoopingExecutionContext(Wrapper): assert self.state == (False, False), ('{}.start() can only be called on a new node.').format(type(self).__name__) assert self._context is None - self._started = True - self._context = () - for processor in get_context_processors(self.wrapped): - _processed = processor(self.wrapped, self, *self._context) + try: + self._context = self.parent.services.args_for(self.wrapped) if self.parent else () + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + raise + + for processor in resolve_processors(self.wrapped): try: - # todo yield from ? + _processed = processor(self.wrapped, self, *self._context) _append_to_context = next(_processed) if _append_to_context is not None: self._context += ensure_tuple(_append_to_context) @@ -127,9 +133,7 @@ class LoopingExecutionContext(Wrapper): sleep(self.PERIOD) def step(self): - """ - TODO xxx this is a step, not a loop - """ + """Left as an exercise for the children.""" raise NotImplementedError('Abstract.') def stop(self): @@ -177,37 +181,6 @@ class LoopingExecutionContext(Wrapper): print(trace) -class PluginExecutionContext(LoopingExecutionContext): - PERIOD = 0.5 - - def __init__(self, wrapped, parent): - # Instanciate plugin. This is not yet considered stable, as at some point we may need a way to configure - # plugins, for example if it depends on an external service. - super().__init__(wrapped(self), parent) - - def start(self): - super().start() - - try: - self.wrapped.initialize() - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - - def shutdown(self): - try: - self.wrapped.finalize() - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - finally: - self.alive = False - - def step(self): - try: - self.wrapped.run() - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - - class NodeExecutionContext(WithStatistics, LoopingExecutionContext): """ todo: make the counter dependant of parent context? @@ -313,6 +286,37 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): self.send(_resolve(input_bag, output)) +class PluginExecutionContext(LoopingExecutionContext): + PERIOD = 0.5 + + def __init__(self, wrapped, parent): + # Instanciate plugin. This is not yet considered stable, as at some point we may need a way to configure + # plugins, for example if it depends on an external service. + super().__init__(wrapped(self), parent) + + def start(self): + super().start() + + try: + self.wrapped.initialize() + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + + def shutdown(self): + try: + self.wrapped.finalize() + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + finally: + self.alive = False + + def step(self): + try: + self.wrapped.run() + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + + def _iter(mixed): if isinstance(mixed, (dict, list, str)): raise TypeError(type(mixed).__name__) diff --git a/bonobo/ext/opendatasoft.py b/bonobo/ext/opendatasoft.py index 0d255d7..0831054 100644 --- a/bonobo/ext/opendatasoft.py +++ b/bonobo/ext/opendatasoft.py @@ -2,8 +2,9 @@ from urllib.parse import urlencode import requests # todo: make this a service so we can substitute it ? -from bonobo.config import Configurable, Option -from bonobo.context import ContextProcessor, contextual +from bonobo.config import Option +from bonobo.config.processors import ContextProcessor, contextual +from bonobo.config.configurables import Configurable from bonobo.util.compat import deprecated from bonobo.util.objects import ValueHolder diff --git a/bonobo/io/csv.py b/bonobo/io/csv.py index 73856b7..54a3fe6 100644 --- a/bonobo/io/csv.py +++ b/bonobo/io/csv.py @@ -1,7 +1,7 @@ import csv from bonobo.config import Option -from bonobo.context import ContextProcessor, contextual +from bonobo.config.processors import ContextProcessor, contextual from bonobo.util.objects import ValueHolder from .file import FileReader, FileWriter, FileHandler diff --git a/bonobo/io/file.py b/bonobo/io/file.py index 880baca..25c4f84 100644 --- a/bonobo/io/file.py +++ b/bonobo/io/file.py @@ -1,8 +1,8 @@ from io import BytesIO -from bonobo.config import Configurable, Option -from bonobo.context import ContextProcessor -from bonobo.context.processors import contextual +from bonobo.config import Option +from bonobo.config.processors import ContextProcessor, contextual +from bonobo.config.configurables import Configurable from bonobo.util.objects import ValueHolder __all__ = [ diff --git a/bonobo/io/json.py b/bonobo/io/json.py index e9d34f1..50b84d5 100644 --- a/bonobo/io/json.py +++ b/bonobo/io/json.py @@ -1,6 +1,6 @@ import json -from bonobo.context import ContextProcessor, contextual +from bonobo.config.processors import ContextProcessor, contextual from .file import FileWriter, FileReader __all__ = [ diff --git a/bonobo/strategies/__init__.py b/bonobo/strategies/__init__.py new file mode 100644 index 0000000..f912c2a --- /dev/null +++ b/bonobo/strategies/__init__.py @@ -0,0 +1,8 @@ +from bonobo.strategies.executor import ThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy +from bonobo.strategies.naive import NaiveStrategy + +__all__ = [ + 'NaiveStrategy', + 'ProcessPoolExecutorStrategy', + 'ThreadPoolExecutorStrategy', +] diff --git a/bonobo/core/strategies/base.py b/bonobo/strategies/base.py similarity index 86% rename from bonobo/core/strategies/base.py rename to bonobo/strategies/base.py index 08262a8..7758241 100644 --- a/bonobo/core/strategies/base.py +++ b/bonobo/strategies/base.py @@ -1,4 +1,4 @@ -from bonobo.context.execution import GraphExecutionContext +from bonobo.execution import GraphExecutionContext class Strategy: diff --git a/bonobo/core/strategies/executor.py b/bonobo/strategies/executor.py similarity index 90% rename from bonobo/core/strategies/executor.py rename to bonobo/strategies/executor.py index aa70fe2..d3ad119 100644 --- a/bonobo/core/strategies/executor.py +++ b/bonobo/strategies/executor.py @@ -3,7 +3,7 @@ import time from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor from bonobo.constants import BEGIN, END -from bonobo.core.strategies.base import Strategy +from bonobo.strategies.base import Strategy from bonobo.structs.bags import Bag @@ -18,8 +18,8 @@ class ExecutorStrategy(Strategy): def create_executor(self): return self.executor_factory() - def execute(self, graph, *args, plugins=None, **kwargs): - context = self.create_graph_execution_context(graph, plugins=plugins) + def execute(self, graph, *args, plugins=None, services=None, **kwargs): + context = self.create_graph_execution_context(graph, plugins=plugins, services=services) context.recv(BEGIN, Bag(), END) executor = self.create_executor() diff --git a/bonobo/core/strategies/naive.py b/bonobo/strategies/naive.py similarity index 89% rename from bonobo/core/strategies/naive.py rename to bonobo/strategies/naive.py index 0a2df40..b93a2e9 100644 --- a/bonobo/core/strategies/naive.py +++ b/bonobo/strategies/naive.py @@ -1,5 +1,5 @@ from bonobo.constants import BEGIN, END -from bonobo.core.strategies.base import Strategy +from bonobo.strategies.base import Strategy from bonobo.structs.bags import Bag diff --git a/bonobo/core/strategies/util.py b/bonobo/strategies/util.py similarity index 100% rename from bonobo/core/strategies/util.py rename to bonobo/strategies/util.py diff --git a/bonobo/util/__init__.py b/bonobo/util/__init__.py index bc0b6ac..8b13789 100644 --- a/bonobo/util/__init__.py +++ b/bonobo/util/__init__.py @@ -1,103 +1 @@ -""" Various simple utilities. """ -import functools -from pprint import pprint as _pprint - -from colorama import Fore, Style - -from bonobo.constants import NOT_MODIFIED -from bonobo.context.processors import contextual -from bonobo.structs.bags import Bag -from bonobo.util.objects import ValueHolder -from bonobo.util.term import CLEAR_EOL - -__all__ = [ - 'Limit', - 'NOT_MODIFIED', - 'PrettyPrint', - 'Tee', - 'count', - 'noop', - 'pprint', -] - - -def identity(x): - return x - - -def Limit(n=10): - i = 0 - - def _limit(*args, **kwargs): - nonlocal i, n - i += 1 - if i <= n: - yield NOT_MODIFIED - - _limit.__name__ = 'Limit({})'.format(n) - return _limit - - -def Tee(f): - @functools.wraps(f) - def wrapped(*args, **kwargs): - nonlocal f - f(*args, **kwargs) - return NOT_MODIFIED - - return wrapped - - -@contextual -def count(counter, *args, **kwargs): - counter += 1 - - -@count.add_context_processor -def _count_counter(self, context): - counter = ValueHolder(0) - yield counter - context.send(Bag(counter.value)) - - -pprint = Tee(_pprint) - - -def PrettyPrint(title_keys=('title', 'name', 'id'), print_values=True, sort=True): - def _pprint(*args, **kwargs): - nonlocal title_keys, sort, print_values - - row = args[0] - for key in title_keys: - if key in row: - print(Style.BRIGHT, row.get(key), Style.RESET_ALL, sep='') - break - - if print_values: - for k in sorted(row) if sort else row: - print( - ' • ', - Fore.BLUE, - k, - Style.RESET_ALL, - ' : ', - Fore.BLACK, - '(', - type(row[k]).__name__, - ')', - Style.RESET_ALL, - ' ', - repr(row[k]), - CLEAR_EOL, - ) - - yield NOT_MODIFIED - - _pprint.__name__ = 'pprint' - - return _pprint - - -def noop(*args, **kwargs): # pylint: disable=unused-argument - return NOT_MODIFIED diff --git a/bonobo/util/compat.py b/bonobo/util/compat.py index 8777f09..35b2500 100644 --- a/bonobo/util/compat.py +++ b/bonobo/util/compat.py @@ -26,6 +26,20 @@ def is_platform_32bit(): return struct.calcsize("P") * 8 < 64 +def deprecated_alias(alias, func): + @functools.wraps(func) + def new_func(*args, **kwargs): + warnings.simplefilter('always', DeprecationWarning) # turn off filter + warnings.warn( + "Call to deprecated function alias {}, use {} instead.".format(alias, func.__name__), + category=DeprecationWarning, stacklevel=2 + ) + warnings.simplefilter('default', DeprecationWarning) # reset filter + return func(*args, **kwargs) + + return new_func + + def deprecated(func): """This is a decorator which can be used to mark functions as deprecated. It will result in a warning being emmitted diff --git a/bonobo/util/iterators.py b/bonobo/util/iterators.py index 69bb75b..0a79f96 100644 --- a/bonobo/util/iterators.py +++ b/bonobo/util/iterators.py @@ -15,3 +15,9 @@ def force_iterator(mixed): return iter(mixed) except TypeError: return [mixed] if mixed else [] + + +def ensure_tuple(tuple_or_mixed): + if isinstance(tuple_or_mixed, tuple): + return tuple_or_mixed + return (tuple_or_mixed,) \ No newline at end of file diff --git a/bonobo/util/lifecycle.py b/bonobo/util/lifecycle.py index 16acf68..c497f41 100644 --- a/bonobo/util/lifecycle.py +++ b/bonobo/util/lifecycle.py @@ -1,4 +1,4 @@ -from bonobo.util import noop +from bonobo.basics import noop def _create_lifecycle_functions(noun, verb): diff --git a/bonobo/util/testing.py b/bonobo/util/testing.py index 2e0d20a..6f3fa57 100644 --- a/bonobo/util/testing.py +++ b/bonobo/util/testing.py @@ -1,6 +1,6 @@ from unittest.mock import MagicMock -from bonobo.context.execution import NodeExecutionContext +from bonobo.execution import NodeExecutionContext class CapturingNodeExecutionContext(NodeExecutionContext): diff --git a/docs/guide/services.rst b/docs/guide/services.rst new file mode 100644 index 0000000..709b9a3 --- /dev/null +++ b/docs/guide/services.rst @@ -0,0 +1,74 @@ +Services and dependencies (draft implementation) +================================================ + +Most probably, you'll want to use external systems within your transformations. Those systems may include databases, +apis (using http, for example), filesystems, etc. + +For a start, including those services hardcoded in your transformations can do the job, but you'll pretty soon feel +limited, for two main reasons: + +* Hardcoded and tightly linked dependencies make your transformation atoms hard to test. +* Processing data on your laptop is great, but being able to do it on different systems (or stages), in different + environments, is more realistic. + +Service injection +::::::::::::::::: + +To solve this problem, we introduce a light dependency injection system that basically allows you to define named +dependencies in your transformations, and provide an implementation at runtime. + +Let's define such a transformation: + +.. code-block:: python + + from bonobo.config import Configurable, Service + + class JoinDatabaseCategories(Configurable): + database = Service(default='primary_sql_database') + + def __call__(self, database, row): + return { + **row, + 'category': database.get_category_name_for_sku(row['sku']) + } + +This piece of code tells bonobo that your transformation expect a sercive called "primary_sql_database", that will be +injected to your calls under the parameter name "database". + +Let's see how to execute it: + +.. code-block:: python + + import bonobo + + bonobo.run( + [...extract...], + JoinDatabaseCategories(), + [...load...], + services={ + 'primary_sql_database': my_database_service, + } + ) + +Future +:::::: + +This is the first proposed implementation and it will evolve, but looks a lot like how we used bonobo ancestor in +production. + +You can expect to see the following features pretty soon: + +* Singleton or prototype based injection (to use spring terminology, see + https://www.tutorialspoint.com/spring/spring_bean_scopes.htm), allowing smart factory usage and efficient sharing of + resources. +* Lazily resolved parameters, eventually overriden by command line or environment, so you can for example override the + database DSN or target filesystem on command line (or with shell environment). +* Pool based locks that ensure that only one (or n) transformations are using a given service at the same time. + +This is under heavy development, let us know what you think (slack may be a good place for this). + + +Read more +::::::::: + +todo: example code. diff --git a/docs/roadmap.rst b/docs/roadmap.rst new file mode 100644 index 0000000..23993ae --- /dev/null +++ b/docs/roadmap.rst @@ -0,0 +1,37 @@ +Detailed roadmap +================ + +Next... +::::::: + +* Release process specialised for bonobo. With changelog production, etc. +* Document how to upgrade version, like, minor need change badges, etc. +* PyPI page looks like crap: https://pypi.python.org/pypi/bonobo/0.2.1 +* Windows break because of readme encoding. Fix in edgy. +* bonobo init --with sqlalchemy,docker +* logger, vebosity level +* Console run should allow console plugin as a command line argument (or silence it). +* ContextProcessors not clean + +Version 0.3 +::::::::::: + +* Services ! +* SQLAlchemy 101 + +Version 0.2 +::::::::::: + +* Autodetect if within jupyter notebook context, and apply plugin if it's the case. +* New bonobo.structs package with simple data structures (bags, graphs, tokens). + +Plugins API +::::::::::: + +* Stabilize, find other things to do. + +Minor stuff +::::::::::: + +* Should we include datasets in the repo or not? As they may change, grow, and even eventually have licenses we can't use, + it's probably best if we don't. \ No newline at end of file diff --git a/tests/context/test_processors.py b/tests/context/test_processors.py index a0832ea..bf62500 100644 --- a/tests/context/test_processors.py +++ b/tests/context/test_processors.py @@ -1,7 +1,6 @@ from operator import attrgetter -from bonobo import contextual, ContextProcessor -from bonobo.context.processors import get_context_processors +from bonobo.config.processors import ContextProcessor, contextual, resolve_processors @contextual @@ -46,7 +45,7 @@ class CP3(CP2): def get_all_processors_names(cls): - return list(map(attrgetter('__name__'), get_context_processors(cls))) + return list(map(attrgetter('__name__'), resolve_processors(cls))) def test_inheritance_and_ordering(): diff --git a/tests/core/test_services.py b/tests/core/test_services.py deleted file mode 100644 index 3a847c8..0000000 --- a/tests/core/test_services.py +++ /dev/null @@ -1,24 +0,0 @@ -from bonobo import inject, service - - -class MyFoo(): - pass - - -def test_service_is_singleton(): - @service - def foo(): - return MyFoo() - - assert foo() is foo() - - @inject(foo) - def bar(myfoo): - assert myfoo is foo() - - bar() - - foo2 = foo.define() - - assert type(foo()) == type(foo2()) - assert foo2() is not foo() diff --git a/tests/io/test_csv.py b/tests/io/test_csv.py index d22d3db..87904a9 100644 --- a/tests/io/test_csv.py +++ b/tests/io/test_csv.py @@ -2,7 +2,7 @@ import pytest from bonobo import Bag, CsvReader, CsvWriter from bonobo.constants import BEGIN, END -from bonobo.context.execution import NodeExecutionContext +from bonobo.execution import NodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext diff --git a/tests/io/test_file.py b/tests/io/test_file.py index 2fda59f..234a323 100644 --- a/tests/io/test_file.py +++ b/tests/io/test_file.py @@ -1,15 +1,15 @@ import pytest -from bonobo import FileWriter, Bag, FileReader -from bonobo.context.execution import NodeExecutionContext -from bonobo.util.testing import CapturingNodeExecutionContext +from bonobo import Bag, FileReader, FileWriter from bonobo.constants import BEGIN, END +from bonobo.execution import NodeExecutionContext +from bonobo.util.testing import CapturingNodeExecutionContext @pytest.mark.parametrize( 'lines,output', [ - (('ACME', ), 'ACME'), # one line... + (('ACME',), 'ACME'), # one line... (('Foo', 'Bar', 'Baz'), 'Foo\nBar\nBaz'), # more than one line... ] ) diff --git a/tests/io/test_json.py b/tests/io/test_json.py index 89594c4..8f6d84b 100644 --- a/tests/io/test_json.py +++ b/tests/io/test_json.py @@ -1,8 +1,8 @@ import pytest -from bonobo import Bag, JsonWriter, JsonReader +from bonobo import Bag, JsonReader, JsonWriter from bonobo.constants import BEGIN, END -from bonobo.context.execution import NodeExecutionContext +from bonobo.execution import NodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext diff --git a/tests/test_basicusage.py b/tests/test_basicusage.py index 8889687..282cc85 100644 --- a/tests/test_basicusage.py +++ b/tests/test_basicusage.py @@ -1,11 +1,12 @@ import pytest import bonobo as bb +import bonobo.basics @pytest.mark.timeout(2) def test_run_graph_noop(): - graph = bb.Graph(bb.noop) + graph = bb.Graph(bonobo.basics.noop) assert len(graph) == 1 result = bb.run(graph, strategy='threadpool') diff --git a/tests/test_config.py b/tests/test_config.py index daa08d8..e8cf2b9 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,6 +1,8 @@ import pytest -from bonobo import Configurable, Option +from bonobo.config.configurables import Configurable +from bonobo.config.options import Option +from bonobo.config.services import Container, Service, validate_service_name class MyConfigurable(Configurable): @@ -17,6 +19,26 @@ class MyBetterConfigurable(MyConfigurable): required_str = Option(str, required=False, default='kaboom') +class PrinterInterface(): + def print(self, *args): + raise NotImplementedError() + + +class ConcretePrinter(PrinterInterface): + def __init__(self, prefix): + self.prefix = prefix + + def print(self, *args): + return ';'.join((self.prefix, *args)) + + +class MyServiceDependantConfigurable(Configurable): + printer = Service(PrinterInterface, ) + + def __call__(self, printer: PrinterInterface, *args): + return printer.print(*args) + + def test_missing_required_option_error(): with pytest.raises(TypeError) as exc: MyConfigurable() @@ -75,3 +97,37 @@ def test_option_resolution_order(): assert o.required_str == 'kaboom' assert o.default_str == 'foo' assert o.integer == None + + +def test_service_name_validator(): + assert validate_service_name('foo') == 'foo' + assert validate_service_name('foo.bar') == 'foo.bar' + assert validate_service_name('Foo') == 'Foo' + assert validate_service_name('Foo.Bar') == 'Foo.Bar' + assert validate_service_name('Foo.a0') == 'Foo.a0' + + with pytest.raises(ValueError): + validate_service_name('foo.0') + + with pytest.raises(ValueError): + validate_service_name('0.foo') + + +SERVICES = Container( + printer0=ConcretePrinter(prefix='0'), + printer1=ConcretePrinter(prefix='1'), +) + + +def test_service_dependency(): + o = MyServiceDependantConfigurable(printer='printer0') + + assert o(SERVICES.get('printer0'), 'foo', 'bar') == '0;foo;bar' + assert o(SERVICES.get('printer1'), 'bar', 'baz') == '1;bar;baz' + assert o(*SERVICES.args_for(o), 'foo', 'bar') == '0;foo;bar' + + +def test_service_dependency_unavailable(): + o = MyServiceDependantConfigurable(printer='printer2') + with pytest.raises(KeyError): + SERVICES.args_for(o) diff --git a/tests/core/test_contexts.py b/tests/test_execution.py similarity index 89% rename from tests/core/test_contexts.py rename to tests/test_execution.py index 0b56120..dafd7e1 100644 --- a/tests/core/test_contexts.py +++ b/tests/test_execution.py @@ -1,6 +1,7 @@ -from bonobo import Graph, NaiveStrategy, Bag, contextual +from bonobo import Graph, NaiveStrategy, Bag +from bonobo.config.processors import contextual from bonobo.constants import BEGIN, END -from bonobo.context.execution import GraphExecutionContext +from bonobo.execution import GraphExecutionContext def generate_integers(): From 9b87597c243f9b57b8b5c1f73369ee58d1411495 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Tue, 25 Apr 2017 22:38:39 +0200 Subject: [PATCH 2/5] Small change to force services definitions to be strings, and to force default names. --- bonobo/config/services.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bonobo/config/services.py b/bonobo/config/services.py index e2dfc5c..2dc38a4 100644 --- a/bonobo/config/services.py +++ b/bonobo/config/services.py @@ -41,8 +41,8 @@ class Service(Option): """ - def __init__(self, type=None, *, required=False, default=None): - super().__init__(type, required=required, default=default) + def __init__(self, name): + super().__init__(str, required=False, default=name) def __set__(self, inst, value): inst.__options_values__[self.name] = validate_service_name(value) From ad502d7e238e4637848e79423ee0757f0575eb67 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Tue, 25 Apr 2017 23:05:18 +0200 Subject: [PATCH 3/5] Split execution module in submodules for better readability. --- Makefile | 2 +- Projectfile | 2 +- bonobo/execution.py | 340 ----------------------------------- bonobo/execution/__init__.py | 9 + bonobo/execution/base.py | 105 +++++++++++ bonobo/execution/graph.py | 68 +++++++ bonobo/execution/node.py | 133 ++++++++++++++ bonobo/execution/plugin.py | 34 ++++ bonobo/strategies/base.py | 2 +- bonobo/util/iterators.py | 8 +- bonobo/util/testing.py | 2 +- setup.py | 36 ++-- tests/io/test_csv.py | 2 +- tests/io/test_file.py | 2 +- tests/io/test_json.py | 2 +- tests/test_execution.py | 2 +- 16 files changed, 384 insertions(+), 365 deletions(-) delete mode 100644 bonobo/execution.py create mode 100644 bonobo/execution/__init__.py create mode 100644 bonobo/execution/base.py create mode 100644 bonobo/execution/graph.py create mode 100644 bonobo/execution/node.py create mode 100644 bonobo/execution/plugin.py diff --git a/Makefile b/Makefile index 76ff025..b8435c4 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ # This file has been auto-generated. # All changes will be lost, see Projectfile. # -# Updated at 2017-04-24 23:47:46.325867 +# Updated at 2017-04-25 23:05:05.062813 PYTHON ?= $(shell which python) PYTHON_BASENAME ?= $(shell basename $(PYTHON)) diff --git a/Projectfile b/Projectfile index 73e5ba1..c02391f 100644 --- a/Projectfile +++ b/Projectfile @@ -23,7 +23,7 @@ enable_features = { install_requires = [ 'colorama >=0.3,<0.4', 'psutil >=5.2,<5.3', - 'requests >=2.12,<2.13', + 'requests >=2.13,<2.14', 'stevedore >=1.19,<1.20', ] diff --git a/bonobo/execution.py b/bonobo/execution.py deleted file mode 100644 index c23fe8c..0000000 --- a/bonobo/execution.py +++ /dev/null @@ -1,340 +0,0 @@ -import sys -import traceback -from functools import partial -from queue import Empty -from time import sleep - -from bonobo.config import Container -from bonobo.config.processors import resolve_processors -from bonobo.constants import BEGIN, END, INHERIT_INPUT, NOT_MODIFIED -from bonobo.core.inputs import Input -from bonobo.core.statistics import WithStatistics -from bonobo.errors import InactiveReadableError -from bonobo.structs.bags import Bag, ErrorBag -from bonobo.util.iterators import ensure_tuple -from bonobo.util.objects import Wrapper - -__all__ = [ - 'GraphExecutionContext', - 'NodeExecutionContext', - 'PluginExecutionContext', -] - - -class GraphExecutionContext: - @property - def started(self): - return any(node.started for node in self.nodes) - - @property - def stopped(self): - return all(node.started and node.stopped for node in self.nodes) - - @property - def alive(self): - return any(node.alive for node in self.nodes) - - def __init__(self, graph, plugins=None, services=None): - self.graph = graph - self.nodes = [NodeExecutionContext(node, parent=self) for node in self.graph.nodes] - self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()] - self.services = Container(services) if services else Container() - - for i, component_context in enumerate(self): - try: - component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)] - except KeyError: - continue - - component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True) - component_context.input.on_end = partial(component_context.send, END, _control=True) - component_context.input.on_finalize = partial(component_context.stop) - - def __getitem__(self, item): - return self.nodes[item] - - def __len__(self): - return len(self.nodes) - - def __iter__(self): - yield from self.nodes - - def recv(self, *messages): - """Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in - our graph.""" - - for i in self.graph.outputs_of(BEGIN): - for message in messages: - self[i].recv(message) - - def start(self): - # todo use strategy - for node in self.nodes: - node.start() - - def loop(self): - # todo use strategy - for node in self.nodes: - node.loop() - - def stop(self): - # todo use strategy - for node in self.nodes: - node.stop() - - -class LoopingExecutionContext(Wrapper): - alive = True - PERIOD = 0.25 - - @property - def state(self): - return self._started, self._stopped - - @property - def started(self): - return self._started - - @property - def stopped(self): - return self._stopped - - def __init__(self, wrapped, parent): - super().__init__(wrapped) - self.parent = parent - self._started, self._stopped, self._context, self._stack = False, False, None, [] - - def start(self): - assert self.state == (False, - False), ('{}.start() can only be called on a new node.').format(type(self).__name__) - assert self._context is None - self._started = True - try: - self._context = self.parent.services.args_for(self.wrapped) if self.parent else () - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - raise - - for processor in resolve_processors(self.wrapped): - try: - _processed = processor(self.wrapped, self, *self._context) - _append_to_context = next(_processed) - if _append_to_context is not None: - self._context += ensure_tuple(_append_to_context) - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - raise - self._stack.append(_processed) - - def loop(self): - """Generic loop. A bit boring. """ - while self.alive: - self.step() - sleep(self.PERIOD) - - def step(self): - """Left as an exercise for the children.""" - raise NotImplementedError('Abstract.') - - def stop(self): - assert self._started, ('{}.stop() can only be called on a previously started node.').format(type(self).__name__) - if self._stopped: - return - - assert self._context is not None - - self._stopped = True - while len(self._stack): - processor = self._stack.pop() - try: - # todo yield from ? how to ? - next(processor) - except StopIteration as exc: - # This is normal, and wanted. - pass - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - raise - else: - # No error ? We should have had StopIteration ... - raise RuntimeError('Context processors should not yield more than once.') - - def handle_error(self, exc, trace): - """ - Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception - or somehow make me think it is an exception, I'll handle it. - - :param exc: the culprit - :param trace: Hercule Poirot's logbook. - :return: to hell - """ - - from colorama import Fore, Style - print( - Style.BRIGHT, - Fore.RED, - '\U0001F4A3 {} in {}'.format(type(exc).__name__, self.wrapped), - Style.RESET_ALL, - sep='', - file=sys.stderr, - ) - print(trace) - - -class NodeExecutionContext(WithStatistics, LoopingExecutionContext): - """ - todo: make the counter dependant of parent context? - """ - - @property - def alive(self): - """todo check if this is right, and where it is used""" - return self.input.alive and self._started and not self._stopped - - def __init__(self, wrapped, parent): - LoopingExecutionContext.__init__(self, wrapped, parent) - WithStatistics.__init__(self, 'in', 'out', 'err') - - self.input = Input() - self.outputs = [] - - def __str__(self): - return (('+' if self.alive else '-') + ' ' + self.__name__ + ' ' + self.get_statistics_as_string()).strip() - - def __repr__(self): - return '<' + self.__str__() + '>' - - def recv(self, *messages): - """ - Push a message list to this context's input queue. - - :param mixed value: message - """ - for message in messages: - self.input.put(message) - - def send(self, value, _control=False): - """ - Sends a message to all of this context's outputs. - - :param mixed value: message - :param _control: if true, won't count in statistics. - """ - if not _control: - self.increment('out') - for output in self.outputs: - output.put(value) - - def get(self): - """ - Get from the queue first, then increment stats, so if Queue raise Timeout or Empty, stat won't be changed. - - """ - row = self.input.get(timeout=self.PERIOD) - self.increment('in') - return row - - def loop(self): - while True: - try: - self.step() - except KeyboardInterrupt: - raise - except InactiveReadableError: - break - except Empty: - sleep(self.PERIOD) - continue - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - - def step(self): - # Pull data from the first available input channel. - """Runs a transformation callable with given args/kwargs and flush the result into the right - output channel.""" - - input_bag = self.get() - - # todo add timer - self.handle_results(input_bag, input_bag.apply(self.wrapped, *self._context)) - - def handle_results(self, input_bag, results): - # self._exec_time += timer.duration - # Put data onto output channels - try: - results = _iter(results) - except TypeError: # not an iterator - if results: - if isinstance(results, ErrorBag): - results.apply(self.handle_error) - else: - self.send(_resolve(input_bag, results)) - else: - # case with no result, an execution went through anyway, use for stats. - # self._exec_count += 1 - pass - else: - while True: # iterator - try: - output = next(results) - except StopIteration: - break - else: - if isinstance(output, ErrorBag): - output.apply(self.handle_error) - else: - self.send(_resolve(input_bag, output)) - - -class PluginExecutionContext(LoopingExecutionContext): - PERIOD = 0.5 - - def __init__(self, wrapped, parent): - # Instanciate plugin. This is not yet considered stable, as at some point we may need a way to configure - # plugins, for example if it depends on an external service. - super().__init__(wrapped(self), parent) - - def start(self): - super().start() - - try: - self.wrapped.initialize() - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - - def shutdown(self): - try: - self.wrapped.finalize() - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - finally: - self.alive = False - - def step(self): - try: - self.wrapped.run() - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - - -def _iter(mixed): - if isinstance(mixed, (dict, list, str)): - raise TypeError(type(mixed).__name__) - return iter(mixed) - - -def _resolve(input_bag, output): - # NotModified means to send the input unmodified to output. - if output is NOT_MODIFIED: - return input_bag - - # If it does not look like a bag, let's create one for easier manipulation - if hasattr(output, 'apply'): - # Already a bag? Check if we need to set parent. - if INHERIT_INPUT in output.flags: - output.set_parent(input_bag) - else: - # Not a bag? Let's encapsulate it. - output = Bag(output) - - return output diff --git a/bonobo/execution/__init__.py b/bonobo/execution/__init__.py new file mode 100644 index 0000000..aaf4ba3 --- /dev/null +++ b/bonobo/execution/__init__.py @@ -0,0 +1,9 @@ +from bonobo.execution.graph import GraphExecutionContext, NodeExecutionContext, PluginExecutionContext + +__all__ = [ + 'GraphExecutionContext', + 'NodeExecutionContext', + 'PluginExecutionContext', +] + + diff --git a/bonobo/execution/base.py b/bonobo/execution/base.py new file mode 100644 index 0000000..b84cb70 --- /dev/null +++ b/bonobo/execution/base.py @@ -0,0 +1,105 @@ +import sys +import traceback +from time import sleep + +from bonobo.config.processors import resolve_processors +from bonobo.util.iterators import ensure_tuple +from bonobo.util.objects import Wrapper + + +class LoopingExecutionContext(Wrapper): + alive = True + PERIOD = 0.25 + + @property + def state(self): + return self._started, self._stopped + + @property + def started(self): + return self._started + + @property + def stopped(self): + return self._stopped + + def __init__(self, wrapped, parent): + super().__init__(wrapped) + self.parent = parent + self._started, self._stopped, self._context, self._stack = False, False, None, [] + + def start(self): + assert self.state == (False, + False), ('{}.start() can only be called on a new node.').format(type(self).__name__) + assert self._context is None + self._started = True + try: + self._context = self.parent.services.args_for(self.wrapped) if self.parent else () + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + raise + + for processor in resolve_processors(self.wrapped): + try: + _processed = processor(self.wrapped, self, *self._context) + _append_to_context = next(_processed) + if _append_to_context is not None: + self._context += ensure_tuple(_append_to_context) + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + raise + self._stack.append(_processed) + + def loop(self): + """Generic loop. A bit boring. """ + while self.alive: + self.step() + sleep(self.PERIOD) + + def step(self): + """Left as an exercise for the children.""" + raise NotImplementedError('Abstract.') + + def stop(self): + assert self._started, ('{}.stop() can only be called on a previously started node.').format(type(self).__name__) + if self._stopped: + return + + assert self._context is not None + + self._stopped = True + while len(self._stack): + processor = self._stack.pop() + try: + # todo yield from ? how to ? + next(processor) + except StopIteration as exc: + # This is normal, and wanted. + pass + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + raise + else: + # No error ? We should have had StopIteration ... + raise RuntimeError('Context processors should not yield more than once.') + + def handle_error(self, exc, trace): + """ + Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception + or somehow make me think it is an exception, I'll handle it. + + :param exc: the culprit + :param trace: Hercule Poirot's logbook. + :return: to hell + """ + + from colorama import Fore, Style + print( + Style.BRIGHT, + Fore.RED, + '\U0001F4A3 {} in {}'.format(type(exc).__name__, self.wrapped), + Style.RESET_ALL, + sep='', + file=sys.stderr, + ) + print(trace) \ No newline at end of file diff --git a/bonobo/execution/graph.py b/bonobo/execution/graph.py new file mode 100644 index 0000000..d5e241f --- /dev/null +++ b/bonobo/execution/graph.py @@ -0,0 +1,68 @@ +from functools import partial + +from bonobo.config.services import Container +from bonobo.constants import BEGIN, END +from bonobo.execution.node import NodeExecutionContext +from bonobo.execution.plugin import PluginExecutionContext + + +class GraphExecutionContext: + @property + def started(self): + return any(node.started for node in self.nodes) + + @property + def stopped(self): + return all(node.started and node.stopped for node in self.nodes) + + @property + def alive(self): + return any(node.alive for node in self.nodes) + + def __init__(self, graph, plugins=None, services=None): + self.graph = graph + self.nodes = [NodeExecutionContext(node, parent=self) for node in self.graph.nodes] + self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()] + self.services = Container(services) if services else Container() + + for i, component_context in enumerate(self): + try: + component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)] + except KeyError: + continue + + component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True) + component_context.input.on_end = partial(component_context.send, END, _control=True) + component_context.input.on_finalize = partial(component_context.stop) + + def __getitem__(self, item): + return self.nodes[item] + + def __len__(self): + return len(self.nodes) + + def __iter__(self): + yield from self.nodes + + def recv(self, *messages): + """Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in + our graph.""" + + for i in self.graph.outputs_of(BEGIN): + for message in messages: + self[i].recv(message) + + def start(self): + # todo use strategy + for node in self.nodes: + node.start() + + def loop(self): + # todo use strategy + for node in self.nodes: + node.loop() + + def stop(self): + # todo use strategy + for node in self.nodes: + node.stop() diff --git a/bonobo/execution/node.py b/bonobo/execution/node.py new file mode 100644 index 0000000..0657972 --- /dev/null +++ b/bonobo/execution/node.py @@ -0,0 +1,133 @@ +import traceback +from queue import Empty +from time import sleep + +from bonobo.structs.bags import Bag, ErrorBag +from bonobo.constants import INHERIT_INPUT, NOT_MODIFIED +from bonobo.core.inputs import Input +from bonobo.core.statistics import WithStatistics +from bonobo.errors import InactiveReadableError +from bonobo.execution.base import LoopingExecutionContext +from bonobo.util.iterators import iter_if_not_sequence + + +class NodeExecutionContext(WithStatistics, LoopingExecutionContext): + """ + todo: make the counter dependant of parent context? + """ + + @property + def alive(self): + """todo check if this is right, and where it is used""" + return self.input.alive and self._started and not self._stopped + + def __init__(self, wrapped, parent): + LoopingExecutionContext.__init__(self, wrapped, parent) + WithStatistics.__init__(self, 'in', 'out', 'err') + + self.input = Input() + self.outputs = [] + + def __str__(self): + return (('+' if self.alive else '-') + ' ' + self.__name__ + ' ' + self.get_statistics_as_string()).strip() + + def __repr__(self): + return '<' + self.__str__() + '>' + + def recv(self, *messages): + """ + Push a message list to this context's input queue. + + :param mixed value: message + """ + for message in messages: + self.input.put(message) + + def send(self, value, _control=False): + """ + Sends a message to all of this context's outputs. + + :param mixed value: message + :param _control: if true, won't count in statistics. + """ + if not _control: + self.increment('out') + for output in self.outputs: + output.put(value) + + def get(self): + """ + Get from the queue first, then increment stats, so if Queue raise Timeout or Empty, stat won't be changed. + + """ + row = self.input.get(timeout=self.PERIOD) + self.increment('in') + return row + + def loop(self): + while True: + try: + self.step() + except KeyboardInterrupt: + raise + except InactiveReadableError: + break + except Empty: + sleep(self.PERIOD) + continue + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + + def step(self): + # Pull data from the first available input channel. + """Runs a transformation callable with given args/kwargs and flush the result into the right + output channel.""" + + input_bag = self.get() + + # todo add timer + self.handle_results(input_bag, input_bag.apply(self.wrapped, *self._context)) + + def handle_results(self, input_bag, results): + # self._exec_time += timer.duration + # Put data onto output channels + try: + results = iter_if_not_sequence(results) + except TypeError: # not an iterator + if results: + if isinstance(results, ErrorBag): + results.apply(self.handle_error) + else: + self.send(_resolve(input_bag, results)) + else: + # case with no result, an execution went through anyway, use for stats. + # self._exec_count += 1 + pass + else: + while True: # iterator + try: + output = next(results) + except StopIteration: + break + else: + if isinstance(output, ErrorBag): + output.apply(self.handle_error) + else: + self.send(_resolve(input_bag, output)) + + +def _resolve(input_bag, output): + # NotModified means to send the input unmodified to output. + if output is NOT_MODIFIED: + return input_bag + + # If it does not look like a bag, let's create one for easier manipulation + if hasattr(output, 'apply'): + # Already a bag? Check if we need to set parent. + if INHERIT_INPUT in output.flags: + output.set_parent(input_bag) + else: + # Not a bag? Let's encapsulate it. + output = Bag(output) + + return output diff --git a/bonobo/execution/plugin.py b/bonobo/execution/plugin.py new file mode 100644 index 0000000..ec1c5bd --- /dev/null +++ b/bonobo/execution/plugin.py @@ -0,0 +1,34 @@ +import traceback + +from bonobo.execution.base import LoopingExecutionContext + + +class PluginExecutionContext(LoopingExecutionContext): + PERIOD = 0.5 + + def __init__(self, wrapped, parent): + # Instanciate plugin. This is not yet considered stable, as at some point we may need a way to configure + # plugins, for example if it depends on an external service. + super().__init__(wrapped(self), parent) + + def start(self): + super().start() + + try: + self.wrapped.initialize() + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + + def shutdown(self): + try: + self.wrapped.finalize() + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + finally: + self.alive = False + + def step(self): + try: + self.wrapped.run() + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) \ No newline at end of file diff --git a/bonobo/strategies/base.py b/bonobo/strategies/base.py index 7758241..4b345d4 100644 --- a/bonobo/strategies/base.py +++ b/bonobo/strategies/base.py @@ -1,4 +1,4 @@ -from bonobo.execution import GraphExecutionContext +from bonobo.execution.graph import GraphExecutionContext class Strategy: diff --git a/bonobo/util/iterators.py b/bonobo/util/iterators.py index 0a79f96..cdf02a7 100644 --- a/bonobo/util/iterators.py +++ b/bonobo/util/iterators.py @@ -20,4 +20,10 @@ def force_iterator(mixed): def ensure_tuple(tuple_or_mixed): if isinstance(tuple_or_mixed, tuple): return tuple_or_mixed - return (tuple_or_mixed,) \ No newline at end of file + return (tuple_or_mixed,) + + +def iter_if_not_sequence(mixed): + if isinstance(mixed, (dict, list, str)): + raise TypeError(type(mixed).__name__) + return iter(mixed) \ No newline at end of file diff --git a/bonobo/util/testing.py b/bonobo/util/testing.py index 6f3fa57..ca400cc 100644 --- a/bonobo/util/testing.py +++ b/bonobo/util/testing.py @@ -1,6 +1,6 @@ from unittest.mock import MagicMock -from bonobo.execution import NodeExecutionContext +from bonobo.execution.node import NodeExecutionContext class CapturingNodeExecutionContext(NodeExecutionContext): diff --git a/setup.py b/setup.py index 211bc09..6bd307a 100644 --- a/setup.py +++ b/setup.py @@ -40,36 +40,40 @@ setup( name='bonobo', description='Bonobo', license='Apache License, Version 2.0', - install_requires=['colorama >=0.3,<0.4', 'psutil >=5.2,<5.3', 'requests >=2.12,<2.13', 'stevedore >=1.19,<1.20'], + install_requires=[ + 'colorama >=0.3,<0.4', 'psutil >=5.2,<5.3', 'requests >=2.13,<2.14', + 'stevedore >=1.19,<1.20' + ], version=version, long_description=read('README.rst'), classifiers=read('classifiers.txt', tolines), packages=find_packages(exclude=['ez_setup', 'example', 'test']), include_package_data=True, - data_files=[ - ( - 'share/jupyter/nbextensions/bonobo-jupyter', [ - 'bonobo/ext/jupyter/static/extension.js', 'bonobo/ext/jupyter/static/index.js', - 'bonobo/ext/jupyter/static/index.js.map' - ] - ) - ], + data_files=[('share/jupyter/nbextensions/bonobo-jupyter', [ + 'bonobo/ext/jupyter/static/extension.js', + 'bonobo/ext/jupyter/static/index.js', + 'bonobo/ext/jupyter/static/index.js.map' + ])], extras_require={ 'dev': [ - 'coverage >=4.3,<4.4', 'mock >=2.0,<2.1', 'nose >=1.3,<1.4', 'pylint >=1.6,<1.7', 'pytest >=3,<4', - 'pytest-cov >=2.4,<2.5', 'pytest-timeout >=1.2,<1.3', 'sphinx', 'sphinx_rtd_theme', 'yapf' + 'coverage >=4.3,<4.4', 'mock >=2.0,<2.1', 'pylint >=1,<2', + 'pytest >=3,<4', 'pytest-cov >=2,<3', 'pytest-timeout >=1,<2', + 'sphinx', 'sphinx_rtd_theme', 'yapf' ], 'jupyter': ['jupyter >=1.0,<1.1', 'ipywidgets >=6.0.0.beta5'] }, entry_points={ 'bonobo.commands': [ - 'init = bonobo.commands.init:register', 'run = bonobo.commands.run:register', + 'init = bonobo.commands.init:register', + 'run = bonobo.commands.run:register', 'version = bonobo.commands.version:register' ], 'console_scripts': ['bonobo = bonobo.commands:entrypoint'], - 'edgy.project.features': ['bonobo = ' - 'bonobo.ext.edgy.project.feature:BonoboFeature'] + 'edgy.project.features': + ['bonobo = ' + 'bonobo.ext.edgy.project.feature:BonoboFeature'] }, url='https://www.bonobo-project.org/', - download_url='https://github.com/python-bonobo/bonobo/tarball/{version}'.format(version=version), -) + download_url= + 'https://github.com/python-bonobo/bonobo/tarball/{version}'.format( + version=version), ) diff --git a/tests/io/test_csv.py b/tests/io/test_csv.py index 87904a9..0e09a34 100644 --- a/tests/io/test_csv.py +++ b/tests/io/test_csv.py @@ -2,7 +2,7 @@ import pytest from bonobo import Bag, CsvReader, CsvWriter from bonobo.constants import BEGIN, END -from bonobo.execution import NodeExecutionContext +from bonobo.execution.node import NodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext diff --git a/tests/io/test_file.py b/tests/io/test_file.py index 234a323..71f6785 100644 --- a/tests/io/test_file.py +++ b/tests/io/test_file.py @@ -2,7 +2,7 @@ import pytest from bonobo import Bag, FileReader, FileWriter from bonobo.constants import BEGIN, END -from bonobo.execution import NodeExecutionContext +from bonobo.execution.node import NodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext diff --git a/tests/io/test_json.py b/tests/io/test_json.py index 8f6d84b..1c8c124 100644 --- a/tests/io/test_json.py +++ b/tests/io/test_json.py @@ -2,7 +2,7 @@ import pytest from bonobo import Bag, JsonReader, JsonWriter from bonobo.constants import BEGIN, END -from bonobo.execution import NodeExecutionContext +from bonobo.execution.node import NodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext diff --git a/tests/test_execution.py b/tests/test_execution.py index dafd7e1..e2fe1c2 100644 --- a/tests/test_execution.py +++ b/tests/test_execution.py @@ -1,7 +1,7 @@ from bonobo import Graph, NaiveStrategy, Bag from bonobo.config.processors import contextual from bonobo.constants import BEGIN, END -from bonobo.execution import GraphExecutionContext +from bonobo.execution.graph import GraphExecutionContext def generate_integers(): From 946c5fd1bb6077c07c67d4be1446150ac42b5306 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Tue, 25 Apr 2017 23:18:15 +0200 Subject: [PATCH 4/5] Small refactoring of duplicate code. --- bonobo/execution/node.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/bonobo/execution/node.py b/bonobo/execution/node.py index 0657972..14bea86 100644 --- a/bonobo/execution/node.py +++ b/bonobo/execution/node.py @@ -88,6 +88,12 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): # todo add timer self.handle_results(input_bag, input_bag.apply(self.wrapped, *self._context)) + def push(self, bag): + # MAKE THIS PUBLIC API FOR CONTEXT PROCESSORS !!! + # xxx handle error or send in first call to apply(...)? + # xxx return value ? + bag.apply(self.handle_error) if is_error(bag) else self.send(bag) + def handle_results(self, input_bag, results): # self._exec_time += timer.duration # Put data onto output channels @@ -95,10 +101,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): results = iter_if_not_sequence(results) except TypeError: # not an iterator if results: - if isinstance(results, ErrorBag): - results.apply(self.handle_error) - else: - self.send(_resolve(input_bag, results)) + self.push(_resolve(input_bag, results)) else: # case with no result, an execution went through anyway, use for stats. # self._exec_count += 1 @@ -106,21 +109,23 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): else: while True: # iterator try: - output = next(results) + result = next(results) except StopIteration: break else: - if isinstance(output, ErrorBag): - output.apply(self.handle_error) - else: - self.send(_resolve(input_bag, output)) + self.push(_resolve(input_bag, result)) +def is_error(bag): + return isinstance(bag, ErrorBag) def _resolve(input_bag, output): # NotModified means to send the input unmodified to output. if output is NOT_MODIFIED: return input_bag + if is_error(output): + return output + # If it does not look like a bag, let's create one for easier manipulation if hasattr(output, 'apply'): # Already a bag? Check if we need to set parent. From 08db4213b321e3d6f39639043808fac95332fe56 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Wed, 26 Apr 2017 01:26:11 +0200 Subject: [PATCH 5/5] Allowing container lazy resolution of params and fixing unterminated late transforms in default strategy (bad bug!) --- bonobo/__init__.py | 6 ++++++ bonobo/config/services.py | 21 ++++++++++++++++----- bonobo/examples/datasets/fablabs.txt | 3 ++- bonobo/strategies/executor.py | 1 + 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/bonobo/__init__.py b/bonobo/__init__.py index 39f51a1..50eb44d 100644 --- a/bonobo/__init__.py +++ b/bonobo/__init__.py @@ -54,6 +54,12 @@ def get_examples_path(*pathsegments): def create_strategy(name=None): + """ + Create a strategy, or just returns it if it's already one. + + :param name: + :return: Strategy + """ from bonobo.strategies.base import Strategy import logging diff --git a/bonobo/config/services.py b/bonobo/config/services.py index 2dc38a4..8634868 100644 --- a/bonobo/config/services.py +++ b/bonobo/config/services.py @@ -1,4 +1,5 @@ import re +import types from bonobo.config.options import Option @@ -10,7 +11,6 @@ def validate_service_name(name): raise ValueError('Invalid service name {!r}.'.format(name)) return name - class Service(Option): """ A Service is a special kind of option defining a dependency to something that will be resolved at runtime, using an @@ -48,10 +48,7 @@ class Service(Option): inst.__options_values__[self.name] = validate_service_name(value) def resolve(self, inst, services): - name = getattr(inst, self.name) - if not name in services: - raise KeyError('Cannot resolve service {!r} using provided service collection.'.format(name)) - return services.get(name) + return services.get(getattr(inst, self.name)) class Container(dict): @@ -75,3 +72,17 @@ class Container(dict): for name, option in options.items() if isinstance(option, Service) ) + + def get(self, name, default=None): + if not name in self: + if default: + return default + raise KeyError('Cannot resolve service {!r} using provided service collection.'.format(name)) + value = super().get(name) + if isinstance(value, types.LambdaType): + value = value(self) + return value + + + + diff --git a/bonobo/examples/datasets/fablabs.txt b/bonobo/examples/datasets/fablabs.txt index bf5cc4a..00ee95e 100644 --- a/bonobo/examples/datasets/fablabs.txt +++ b/bonobo/examples/datasets/fablabs.txt @@ -133,4 +133,5 @@ {"city": "Bron", "kind_name": "fab_lab", "links": ["http://fablab-lyon.fr"], "capabilities": "three_d_printing;cnc_milling;laser;vinyl_cutting", "url": "https://www.fablabs.io/labs/fabriquedobjetslibres", "name": "Fabrique d'Objets Libres", "email": "contact@fabriquedobjetslibres.fr", "coordinates": [45.7429334, 4.9082135], "header_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/13/50/01/0190e790-aaec-4f2f-9985-11156655145d/Fabrique d'Objets Libres.jpg", "county": "Rh\u00f4ne", "phone": "+33 7 68 01 40 26 (Tue-Sat 2pm-6pm)", "avatar_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/11/13/49/73ea9f2d-0216-4f52-a6bf-2ff97ee474b2/Fabrique d'Objets Libres.jpg", "postal_code": "69500", "longitude": 4.90821349999999, "country_code": "fr", "latitude": 45.7429334, "address_1": "All\u00e9e Gaillard Romanet", "address_notes": "Au sous-sol de la MJC. Downstairs inside the MJC.", "address_2": "MJC Louis Aragon", "blurb": "Le fablab lyonnais, install\u00e9 \u00e0 la MJC Louis Aragon de Bron, ouvert tous les mercredis et formation hebdomadaire de fabrication num\u00e9rique. Projets autour du handicap, des arts et du recyclage.", "description": "La Fabrique d'Objets Libres est un fablab associatif sur Lyon et sa r\u00e9gion. Install\u00e9 \u00e0 la MJC Louis Aragon de Bron depuis janvier 2013, c'est un espace de cr\u00e9ation et de fabrication num\u00e9rique ouvert \u00e0 tous, qui permet \u00e0 chacun de d\u00e9couvrir, d'inventer et de fabriquer tout type d'objet.\r\n \r\nV\u00e9ritable laboratoire citoyen de fabrication, la Fabrique d\u2019Objets Libres met \u00e0 disposition de ses adh\u00e9rents des outils \u00e0 commande num\u00e9rique et des mati\u00e8res premi\u00e8res secondaires permettant de concevoir et de fabriquer localement des objets libres.\r\nC\u2019est une plate-forme pluridisciplinaire collaborative qui m\u00eale les profils (techniciens, informaticiens, ing\u00e9nieurs, scientifiques, bricoleurs, cr\u00e9ateurs...) et les g\u00e9n\u00e9rations afin de r\u00e9unir tous types de comp\u00e9tences.\r\n\r\nLe fablab est ouvert tous les mercredis pour les \"temps libres\", durant lesquels les adh\u00e9rents utilisent les machines librement. Par ailleurs, il propose un atelier hebdomadaire aux adh\u00e9rents de la MJC, \"De l'id\u00e9e \u00e0 l'objet\": en une dizaine de s\u00e9ances sur un trimestre, les participants apprennent \u00e0 utiliser toutes les machines du fablab pour r\u00e9aliser leurs objets, et r\u00e9fl\u00e9chissent autour d'une th\u00e9matique sociale comme le handicap, la musique, ou la ville.\r\n\r\nL'association organise \u00e9galement des \u00e9v\u00e9nements et ateliers th\u00e9matiques utilisant la fabrication num\u00e9rique autour de sujet plus vastes, comme l'art, avec les machines \u00e0 dessiner, ou le handicap, dans le cadre du projet Handilab, ou encore la fin de vie des objets, avec le Laboratoire de l'Obsolescence D\u00e9programm\u00e9e. Enfin, le fablab s'associe \u00e0 d'autres associations et des entreprises pour des projets communs.", "geometry": {"type": "Point", "coordinates": [4.9082135, 45.7429334]}, "country": "France"}, {"city": "N\u00e9ons-sur-Creuse", "kind_name": "fab_lab", "links": ["http://www.rurallab.org"], "url": "https://www.fablabs.io/labs/rurallab", "coordinates": [46.744746, 0.931698], "header_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/13/49/00/95c7b9f2-a034-4b2b-931d-43ced33ddfb1/RuralLab.jpg", "phone": "+33603318810", "avatar_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/11/12/49/ec5f7c54-e6ce-40fd-b5c5-c4142d208e6b/RuralLab.jpg", "postal_code": "36220", "longitude": 0.931697999999983, "country_code": "fr", "latitude": 46.744746, "address_1": "Rue de l'\u00c9cole", "email": "rurallab36@gmail.com", "blurb": "A FabLab in the countryside in Neons sur Creuse, France", "name": "RuralLab", "geometry": {"type": "Point", "coordinates": [0.931698, 46.744746]}, "country": "France"}, {"city": "Gif-sur-Yvette", "kind_name": "supernode", "links": ["http://fablab.digiscope.fr/#!/", "http://fablabdigiscope.wordpress.com"], "url": "https://www.fablabs.io/labs/fablabdigiscope", "name": "(Fab)Lab Digiscope", "longitude": 2.16830979999997, "header_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/13/52/18/8d63351d-c2fb-4a90-8e58-bb45422202a6/(Fab)Lab Digiscope.jpg", "avatar_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/11/15/46/51553da4-b295-426c-837f-934c311933ba/(Fab)Lab Digiscope.jpg", "postal_code": "91190", "coordinates": [48.7117632, 2.1683098], "country_code": "fr", "latitude": 48.7117632, "address_1": "660 Rue Noetzlin", "capabilities": "three_d_printing;cnc_milling;circuit_production;laser;precision_milling;vinyl_cutting", "email": "fablabdigiscope@gmail.com", "blurb": "(FAB)LAB DIGISCOPE is a fabrication laboratory dedicated to research in sciences | design | education | art | engineering and what ever field of research you come from. Open to Everyone. Book now!", "description": "(FAB)LAB DIGISCOPE is a fabrication laboratory dedicated to research in sciences | design | education | arts | engineering and what ever field of research you come from. We host Fab Academy and Bio Academy. We host Digital Fabrication Classes for EITC Master. Open to Everyone since the beginning.\r\n\r\nFablab Digiscope started in 2013 when Aviz-INRIA research team director Jean-Daniel Fekete and colleague researcher Pierre Dragicevic hired Romain Di Vozzo as a R&D Engineer to be the fablab manager of what would later become an attractive place on the new Campus Paris-Saclay. Fablab Digiscope is part of the Digiscope Project, a network of 10 high-performance platforms for interactive visualization of large datasets and complex computation for which Michel Beaudouin-Lafon is the scientific Director. Fablab Digiscope is mutualised between 10 institutions involved in research and education.\r\n\r\nRomain Di Vozzo runs and develops Fablab Digiscope everyday, trains publics, designs objects, shares creative thoughts, gives advices on designs, etc. Romain also actively collaborates to the globally distributed fablab network and with the Fab Foundation by operating as Fab Academy SuperNode, as Instructor for Fab Academy and Bio Academy, by giving conferences and workshops in France and abroad and by performing very small tasks that make the fablab network grow.", "geometry": {"type": "Point", "coordinates": [2.1683098, 48.7117632]}, "country": "France"}, -{"city": "Metz", "kind_name": "fab_lab", "links": ["http://graoulab.org/wiki", "http://graoulab.org"], "url": "https://www.fablabs.io/labs/graoulab", "coordinates": [49.1262692, 6.182086], "name": "GraouLab", "avatar_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/11/18/24/af4709d8-1f60-48a7-ba35-4c42ef40a195/GraouLab.jpg", "postal_code": "57000", "longitude": 6.18208600000003, "country_code": "fr", "latitude": 49.1262692, "capabilities": "three_d_printing;laser", "email": "contact@graoulab.org", "blurb": "The FabLab of Metz. A place for folks innovation.", "address_1": "7 Avenue de Blida", "geometry": {"type": "Point", "coordinates": [6.182086, 49.1262692]}, "country": "France"} \ No newline at end of file +{"city": "Metz", "kind_name": "fab_lab", "links": ["http://graoulab.org/wiki", "http://graoulab.org"], "url": "https://www.fablabs.io/labs/graoulab", "coordinates": [49.1262692, 6.182086], "name": "GraouLab", "avatar_url": "http://fablabs.io.s3.amazonaws.com/2017/01/28/11/18/24/af4709d8-1f60-48a7-ba35-4c42ef40a195/GraouLab.jpg", "postal_code": "57000", "longitude": 6.18208600000003, "country_code": "fr", "latitude": 49.1262692, "capabilities": "three_d_printing;laser", "email": "contact@graoulab.org", "blurb": "The FabLab of Metz. A place for folks innovation.", "address_1": "7 Avenue de Blida", "geometry": {"type": "Point", "coordinates": [6.182086, 49.1262692]}, "country": "France"} +] \ No newline at end of file diff --git a/bonobo/strategies/executor.py b/bonobo/strategies/executor.py index d3ad119..f3306ea 100644 --- a/bonobo/strategies/executor.py +++ b/bonobo/strategies/executor.py @@ -40,6 +40,7 @@ class ExecutorStrategy(Strategy): def _runner(node_context=node_context): node_context.start() node_context.loop() + node_context.stop() futures.append(executor.submit(_runner))