diff --git a/.gitignore b/.gitignore index da70ff2..179ed2e 100644 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,7 @@ .tox/ .webassets-cache /.idea +/.release /bonobo.iml /bonobo/ext/jupyter/js/node_modules/ /build/ diff --git a/ROADMAP.rst b/ROADMAP.rst new file mode 100644 index 0000000..5b9d3fa --- /dev/null +++ b/ROADMAP.rst @@ -0,0 +1,70 @@ +Roadmap +======= + + + +Milestones +:::::::::: + +* Class-tree for Graph and Nodes + +* Class-tree for execution contexts: + + * GraphExecutionContext + * NodeExecutionContext + * PluginExecutionContext + +* Class-tree for ExecutionStrategies + + * NaiveStrategy + * PoolExecutionStrategy + * ThreadPoolExecutionStrategy + * ProcesPoolExecutionStrategy + * ThreadExecutionStrategy + * ProcessExecutionStrategy + +* Class-tree for bags + + * Bag + * ErrorBag + * InheritingBag + * + +* Co-routines: for unordered, or even ordered but long io. + +* "context processors": replace initialize/finalize by a generator that yields only once + + +* "execute" function: + + .. code-block:: python + + def execute(graph: Graph, *, strategy: ExecutionStrategy, plugins: List[Plugin]) -> Execution: + pass + + + + + + +Version 0.2 +::::::::::: + +* Changelog +* Migration guide +* Update documentation +* Threaded does not terminate anymore +* More tests + +Configuration +::::::::::::: + +* Support for position arguments (options), required options are good candidates. + +Context processors +:::::::::::::::::: + +* Be careful with order, especially with python 3.5. +* @contextual decorator is not clean enough. Once the behavior is right, find a way to use regular inheritance, without meta. +* ValueHolder API not clean. Find a better way. + diff --git a/bonobo/__init__.py b/bonobo/__init__.py index 05f5d14..b2ff5ef 100644 --- a/bonobo/__init__.py +++ b/bonobo/__init__.py @@ -20,19 +20,55 @@ limitations under the License. """ import sys +import warnings assert (sys.version_info >= (3, 5)), 'Python 3.5+ is required to use Bonobo.' from ._version import __version__ +from .config import * +from .context import * from .core import * -from .io import CsvReader, CsvWriter, FileReader, FileWriter, JsonReader, JsonWriter +from .io import * from .util import * +DEFAULT_STRATEGY = 'threadpool' + +STRATEGIES = { + 'naive': NaiveStrategy, + 'processpool': ProcessPoolExecutorStrategy, + 'threadpool': ThreadPoolExecutorStrategy, +} + + +def run(graph, *chain, strategy=None, plugins=None): + from bonobo.core.strategies.base import Strategy + + if len(chain): + warnings.warn('DEPRECATED. You should pass a Graph instance instead of a chain.') + from bonobo import Graph + graph = Graph(graph, *chain) + + if not isinstance(strategy, Strategy): + if strategy is None: + strategy = DEFAULT_STRATEGY + + try: + strategy = STRATEGIES[strategy] + except KeyError as exc: + raise RuntimeError('Invalid strategy {}.'.format(repr(strategy))) from exc + + strategy = strategy() + + return strategy.execute(graph, plugins=plugins) + + __all__ = [ 'Bag', + 'Configurable', + 'ContextProcessor', + 'contextual', 'CsvReader', 'CsvWriter', - 'Configurable', 'FileReader', 'FileWriter', 'Graph', @@ -51,9 +87,9 @@ __all__ = [ 'log', 'noop', 'pprint', - 'run', 'service', 'tee', ] -del sys \ No newline at end of file +del warnings +del sys diff --git a/bonobo/compat/__init__.py b/bonobo/compat/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/bonobo/compat/pandas.py b/bonobo/compat/pandas.py deleted file mode 100644 index 153ba36..0000000 --- a/bonobo/compat/pandas.py +++ /dev/null @@ -1,9 +0,0 @@ -from bonobo import FileWriter, JsonWriter - -to_file = FileWriter -to_json = JsonWriter - -__all__ = [ - 'to_json', - 'to_file', -] diff --git a/bonobo/util/options.py b/bonobo/config.py similarity index 98% rename from bonobo/util/options.py rename to bonobo/config.py index 3618e63..e0e7514 100644 --- a/bonobo/util/options.py +++ b/bonobo/config.py @@ -33,6 +33,8 @@ class Configurable(metaclass=ConfigurableMeta): """ def __init__(self, **kwargs): + super().__init__() + self.__options_values__ = {} missing = set() diff --git a/bonobo/context/__init__.py b/bonobo/context/__init__.py new file mode 100644 index 0000000..2300929 --- /dev/null +++ b/bonobo/context/__init__.py @@ -0,0 +1,6 @@ +from bonobo.context.processors import contextual, ContextProcessor + +__all__ = [ + 'ContextProcessor', + 'contextual', +] diff --git a/bonobo/core/contexts.py b/bonobo/context/execution.py similarity index 53% rename from bonobo/core/contexts.py rename to bonobo/context/execution.py index 8a66863..1f018f8 100644 --- a/bonobo/core/contexts.py +++ b/bonobo/context/execution.py @@ -3,39 +3,31 @@ from functools import partial from queue import Empty from time import sleep +from bonobo.context.processors import get_context_processors from bonobo.core.bags import Bag, INHERIT_INPUT, ErrorBag from bonobo.core.errors import InactiveReadableError from bonobo.core.inputs import Input -from bonobo.core.stats import WithStatistics -from bonobo.util.lifecycle import get_initializer, get_finalizer -from bonobo.util.tokens import BEGIN, END, NEW, RUNNING, TERMINATED, NOT_MODIFIED +from bonobo.core.statistics import WithStatistics +from bonobo.util.objects import Wrapper +from bonobo.util.tokens import BEGIN, END, NOT_MODIFIED -def get_name(mixed): - try: - return mixed.__name__ - except AttributeError: - return type(mixed).__name__ +class GraphExecutionContext: + @property + def started(self): + return any(node.started for node in self.nodes) + @property + def stopped(self): + return all(node.started and node.stopped for node in self.nodes) -def create_component_context(component, parent): - try: - CustomComponentContext = component.Context - except AttributeError: - return ComponentExecutionContext(component, parent=parent) + @property + def alive(self): + return self.started and not self.stopped - if ComponentExecutionContext in CustomComponentContext.__mro__: - bases = (CustomComponentContext, ) - else: - bases = (CustomComponentContext, ComponentExecutionContext) - - return type(get_name(component).title() + 'ExecutionContext', bases, {})(component, parent=parent) - - -class ExecutionContext: def __init__(self, graph, plugins=None): self.graph = graph - self.components = [create_component_context(component, parent=self) for component in self.graph.components] + self.nodes = [NodeExecutionContext(node, parent=self) for node in self.graph.nodes] self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()] for i, component_context in enumerate(self): @@ -46,16 +38,16 @@ class ExecutionContext: component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True) component_context.input.on_end = partial(component_context.send, END, _control=True) - component_context.input.on_finalize = partial(component_context.finalize) + component_context.input.on_finalize = partial(component_context.stop) def __getitem__(self, item): - return self.components[item] + return self.nodes[item] def __len__(self): - return len(self.components) + return len(self.nodes) def __iter__(self): - yield from self.components + yield from self.nodes def recv(self, *messages): """Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in @@ -65,31 +57,67 @@ class ExecutionContext: for message in messages: self[i].recv(message) - @property - def alive(self): - return any(component.alive for component in self.components) + def start(self): + # todo use strategy + for node in self.nodes: + node.start() + + def loop(self): + # todo use strategy + for node in self.nodes: + node.loop() + + def stop(self): + # todo use strategy + for node in self.nodes: + node.stop() -class AbstractLoopContext: +def ensure_tuple(tuple_or_mixed): + if isinstance(tuple_or_mixed, tuple): + return tuple_or_mixed + return (tuple_or_mixed, ) + + +class LoopingExecutionContext(Wrapper): alive = True PERIOD = 0.25 - def __init__(self, wrapped): - self.wrapped = wrapped + @property + def state(self): + return self._started, self._stopped - def run(self): - self.initialize() - self.loop() - self.finalize() + @property + def started(self): + return self._started - def initialize(self): - # pylint: disable=broad-except - try: - initializer = get_initializer(self.wrapped) - except Exception as exc: - self.handle_error(exc, traceback.format_exc()) - else: - return initializer(self) + @property + def stopped(self): + return self._stopped + + def __init__(self, wrapped, parent): + super().__init__(wrapped) + self.parent = parent + self._started, self._stopped, self._context, self._stack = False, False, None, [] + + def start(self): + assert self.state == (False, False), ('{}.start() can only be called on a new node.' + ).format(type(self).__name__) + assert self._context is None + + self._started = True + self._context = () + for processor in get_context_processors(self.wrapped): + _processed = processor(self.wrapped, self, *self._context) + try: + # todo yield from ? + _append_to_context = next(_processed) + if _append_to_context is not None: + self._context += ensure_tuple(_append_to_context) + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + raise + self._stack.append(_processed) def loop(self): """Generic loop. A bit boring. """ @@ -103,15 +131,28 @@ class AbstractLoopContext: """ raise NotImplementedError('Abstract.') - def finalize(self): - """Generic finalizer. """ - # pylint: disable=broad-except - try: - finalizer = get_finalizer(self.wrapped) - except Exception as exc: - return self.handle_error(exc, traceback.format_exc()) - else: - return finalizer(self) + def stop(self): + assert self._started, ('{}.stop() can only be called on a previously started node.').format(type(self).__name__) + if self._stopped: + return + + assert self._context is not None + + self._stopped = True + while len(self._stack): + processor = self._stack.pop() + try: + # todo yield from ? how to ? + next(processor) + except StopIteration as exc: + # This is normal, and wanted. + pass + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + raise + else: + # No error ? We should have had StopIteration ... + raise RuntimeError('Context processors should not yield more than once.') def handle_error(self, exc, trace): """ @@ -129,11 +170,9 @@ class AbstractLoopContext: print(trace) -class PluginExecutionContext(AbstractLoopContext): - def __init__(self, plugin, parent): - self.plugin = plugin - self.parent = parent - super().__init__(self.plugin) +class PluginExecutionContext(LoopingExecutionContext): + def __init__(self, wrapped, parent): + LoopingExecutionContext.__init__(self, wrapped, parent) def shutdown(self): self.alive = False @@ -145,7 +184,7 @@ class PluginExecutionContext(AbstractLoopContext): self.handle_error(exc, traceback.format_exc()) -class ComponentExecutionContext(WithStatistics, AbstractLoopContext): +class NodeExecutionContext(WithStatistics, LoopingExecutionContext): """ todo: make the counter dependant of parent context? """ @@ -153,47 +192,20 @@ class ComponentExecutionContext(WithStatistics, AbstractLoopContext): @property def alive(self): """todo check if this is right, and where it is used""" - return self.input.alive + return self.input.alive and self._started and not self._stopped - @property - def name(self): - return getattr(self.component, '__name__', getattr(type(self.component), '__name__', repr(self.component))) + def __init__(self, wrapped, parent): + LoopingExecutionContext.__init__(self, wrapped, parent) + WithStatistics.__init__(self, 'in', 'out', 'err') - def __init__(self, component, parent): - self.parent = parent - self.component = component self.input = Input() self.outputs = [] - self.state = NEW - self.stats = { - 'in': 0, - 'out': 0, - 'err': 0, - 'read': 0, - 'write': 0, - } - super().__init__(self.component) + def __str__(self): + return (('+' if self.alive else '-') + ' ' + self.__name__ + ' ' + self.get_statistics_as_string()).strip() def __repr__(self): - """Adds "alive" information to the transform representation.""" - return ('+' if self.alive else '-') + ' ' + self.name + ' ' + self.get_stats_as_string() - - def get_stats(self, *args, **kwargs): - return ( - ( - 'in', - self.stats['in'], - ), - ( - 'out', - self.stats['out'], - ), - ( - 'err', - self.stats['err'], - ), - ) + return '<' + self.__str__() + '>' def recv(self, *messages): """ @@ -212,7 +224,7 @@ class ComponentExecutionContext(WithStatistics, AbstractLoopContext): :param _control: if true, won't count in statistics. """ if not _control: - self.stats['out'] += 1 + self.increment('out') for output in self.outputs: output.put(value) @@ -222,41 +234,9 @@ class ComponentExecutionContext(WithStatistics, AbstractLoopContext): """ row = self.input.get(timeout=self.PERIOD) - self.stats['in'] += 1 + self.increment('in') return row - def apply_on(self, bag): - # todo add timer - if getattr(self.component, '_with_context', False): - return bag.apply(self.component, self) - return bag.apply(self.component) - - def initialize(self): - # pylint: disable=broad-except - assert self.state is NEW, ( - 'A {} can only be run once, and thus is expected to be in {} state at ' - 'initialization time.' - ).format(type(self).__name__, NEW) - self.state = RUNNING - - try: - initializer_outputs = super().initialize() - self.handle(None, initializer_outputs) - except Exception as exc: - self.handle_error(exc, traceback.format_exc()) - - def finalize(self): - # pylint: disable=broad-except - assert self.state is RUNNING, ('A {} must be in {} state at finalization time.' - ).format(type(self).__name__, RUNNING) - self.state = TERMINATED - - try: - finalizer_outputs = super().finalize() - self.handle(None, finalizer_outputs) - except Exception as exc: - self.handle_error(exc, traceback.format_exc()) - def loop(self): while True: try: @@ -277,24 +257,21 @@ class ComponentExecutionContext(WithStatistics, AbstractLoopContext): output channel.""" input_bag = self.get() - outputs = self.apply_on(input_bag) - self.handle(input_bag, outputs) - def run(self): - self.initialize() - self.loop() + # todo add timer + self.handle_results(input_bag, input_bag.apply(self.wrapped, *self._context)) - def handle(self, input_bag, outputs): + def handle_results(self, input_bag, results): # self._exec_time += timer.duration # Put data onto output channels try: - outputs = _iter(outputs) + results = _iter(results) except TypeError: # not an iterator - if outputs: - if isinstance(outputs, ErrorBag): - outputs.apply(self.handle_error) + if results: + if isinstance(results, ErrorBag): + results.apply(self.handle_error) else: - self.send(_resolve(input_bag, outputs)) + self.send(_resolve(input_bag, results)) else: # case with no result, an execution went through anyway, use for stats. # self._exec_count += 1 @@ -302,7 +279,7 @@ class ComponentExecutionContext(WithStatistics, AbstractLoopContext): else: while True: # iterator try: - output = next(outputs) + output = next(results) except StopIteration: break else: diff --git a/bonobo/context/processors.py b/bonobo/context/processors.py new file mode 100644 index 0000000..96b4a91 --- /dev/null +++ b/bonobo/context/processors.py @@ -0,0 +1,45 @@ +import types + +_CONTEXT_PROCESSORS_ATTR = '__processors__' + + +def get_context_processors(mixed): + if isinstance(mixed, types.FunctionType): + yield from getattr(mixed, _CONTEXT_PROCESSORS_ATTR, ()) + + for cls in reversed((mixed if isinstance(mixed, type) else type(mixed)).__mro__): + yield from cls.__dict__.get(_CONTEXT_PROCESSORS_ATTR, ()) + + return () + + +class ContextProcessor: + @property + def __name__(self): + return self.func.__name__ + + def __init__(self, func): + self.func = func + + def __repr__(self): + return repr(self.func).replace(' 0)) - def get_stats_as_string(self, *args, **kwargs): - return ' '.join(('{0}={1}'.format(name, cnt) for name, cnt in self.get_stats(*args, **kwargs) if cnt > 0)) + def increment(self, name): + self.statistics[name] += 1 diff --git a/bonobo/core/strategies/base.py b/bonobo/core/strategies/base.py index 6d09f22..08262a8 100644 --- a/bonobo/core/strategies/base.py +++ b/bonobo/core/strategies/base.py @@ -1,4 +1,4 @@ -from bonobo.core.contexts import ExecutionContext +from bonobo.context.execution import GraphExecutionContext class Strategy: @@ -6,10 +6,10 @@ class Strategy: Base class for execution strategies. """ - context_type = ExecutionContext + graph_execution_context_factory = GraphExecutionContext - def create_context(self, graph, *args, **kwargs): - return self.context_type(graph, *args, **kwargs) + def create_graph_execution_context(self, graph, *args, **kwargs): + return self.graph_execution_context_factory(graph, *args, **kwargs) def execute(self, graph, *args, **kwargs): raise NotImplementedError diff --git a/bonobo/core/strategies/executor.py b/bonobo/core/strategies/executor.py index 804c7e2..6be536b 100644 --- a/bonobo/core/strategies/executor.py +++ b/bonobo/core/strategies/executor.py @@ -17,19 +17,34 @@ class ExecutorStrategy(Strategy): executor_factory = Executor + def create_executor(self): + return self.executor_factory() + def execute(self, graph, *args, plugins=None, **kwargs): - context = self.create_context(graph, plugins=plugins) + context = self.create_graph_execution_context(graph, plugins=plugins) + context.recv(BEGIN, Bag(), END) - executor = self.executor_factory() + executor = self.create_executor() futures = [] for plugin_context in context.plugins: - futures.append(executor.submit(plugin_context.run)) - for component_context in context.components: - futures.append(executor.submit(component_context.run)) + def _runner(plugin_context=plugin_context): + plugin_context.start() + plugin_context.loop() + plugin_context.stop() + + futures.append(executor.submit(_runner)) + + for node_context in context.nodes: + + def _runner(node_context=node_context): + node_context.start() + node_context.loop() + + futures.append(executor.submit(_runner)) while context.alive: time.sleep(0.2) @@ -52,7 +67,7 @@ class ProcessPoolExecutorStrategy(ExecutorStrategy): class ThreadCollectionStrategy(Strategy): def execute(self, graph, *args, plugins=None, **kwargs): - context = self.create_context(graph, plugins=plugins) + context = self.create_graph_execution_context(graph, plugins=plugins) context.recv(BEGIN, Bag(), END) threads = [] diff --git a/bonobo/core/strategies/naive.py b/bonobo/core/strategies/naive.py index e39d58a..1ea9fd4 100644 --- a/bonobo/core/strategies/naive.py +++ b/bonobo/core/strategies/naive.py @@ -6,12 +6,12 @@ from ..bags import Bag class NaiveStrategy(Strategy): def execute(self, graph, *args, plugins=None, **kwargs): - context = self.create_context(graph, plugins=plugins) + context = self.create_graph_execution_context(graph, plugins=plugins) context.recv(BEGIN, Bag(), END) # TODO: how to run plugins in "naive" mode ? - - for component in context.components: - component.run() + context.start() + context.loop() + context.stop() return context diff --git a/bonobo/ext/console/plugin.py b/bonobo/ext/console/plugin.py index 699a6c5..c6ee3fc 100644 --- a/bonobo/ext/console/plugin.py +++ b/bonobo/ext/console/plugin.py @@ -86,7 +86,7 @@ class ConsoleOutputPlugin(Plugin): ' ', component.name, ' ', - component.get_stats_as_string( + component.get_statistics_as_string( debug=debug, profile=profile ), ' ', @@ -100,7 +100,7 @@ class ConsoleOutputPlugin(Plugin): ' - ', component.name, ' ', - component.get_stats_as_string( + component.get_statistics_as_string( debug=debug, profile=profile ), ' ', diff --git a/bonobo/ext/opendatasoft.py b/bonobo/ext/opendatasoft.py index f996801..376b5d7 100644 --- a/bonobo/ext/opendatasoft.py +++ b/bonobo/ext/opendatasoft.py @@ -30,7 +30,7 @@ def from_opendatasoft_api( break for row in records: - yield { ** row.get('fields', {}), 'geometry': row.get('geometry', {})} + yield {**row.get('fields', {}), 'geometry': row.get('geometry', {})} start += rows diff --git a/bonobo/io/csv.py b/bonobo/io/csv.py index 72998d2..431fc94 100644 --- a/bonobo/io/csv.py +++ b/bonobo/io/csv.py @@ -1,18 +1,12 @@ import csv -from copy import copy +from bonobo import Option, ContextProcessor, contextual +from bonobo.util.objects import ValueHolder from .file import FileReader, FileWriter, FileHandler class CsvHandler(FileHandler): - delimiter = ';' - quotechar = '"' - headers = None - - -class CsvReader(CsvHandler, FileReader): """ - Reads a CSV and yield the values as dicts. .. attribute:: delimiter @@ -26,30 +20,33 @@ class CsvReader(CsvHandler, FileReader): The list of column names, if the CSV does not contain it as its first line. + """ + delimiter = Option(str, default=';') + quotechar = Option(str, default='"') + headers = Option(tuple) + + +@contextual +class CsvReader(CsvHandler, FileReader): + """ + Reads a CSV and yield the values as dicts. + .. attribute:: skip The amount of lines to skip before it actually yield output. """ - skip = 0 + skip = Option(int, default=0) - def __init__(self, path_or_buf, delimiter=None, quotechar=None, headers=None, skip=None): - super().__init__(path_or_buf) + @ContextProcessor + def csv_headers(self, context, file): + yield ValueHolder(self.headers) - self.delimiter = str(delimiter or self.delimiter) - self.quotechar = quotechar or self.quotechar - self.headers = headers or self.headers - self.skip = skip or self.skip - - @property - def has_headers(self): - return bool(self.headers) - - def read(self, ctx): - reader = csv.reader(ctx.file, delimiter=self.delimiter, quotechar=self.quotechar) - headers = self.has_headers and self.headers or next(reader) - field_count = len(headers) + def read(self, file, headers): + reader = csv.reader(file, delimiter=self.delimiter, quotechar=self.quotechar) + headers.value = headers.value or next(reader) + field_count = len(headers.value) if self.skip and self.skip > 0: for i in range(0, self.skip): @@ -62,30 +59,20 @@ class CsvReader(CsvHandler, FileReader): field_count, )) - yield dict(zip(headers, row)) + yield dict(zip(headers.value, row)) +@contextual class CsvWriter(CsvHandler, FileWriter): - def __init__(self, path_or_buf, delimiter=None, quotechar=None, headers=None): - super().__init__(path_or_buf) + @ContextProcessor + def writer(self, context, file, lineno): + writer = csv.writer(file, delimiter=self.delimiter, quotechar=self.quotechar) + headers = ValueHolder(list(self.headers) if self.headers else None) + yield writer, headers - self.delimiter = str(delimiter or self.delimiter) - self.quotechar = quotechar or self.quotechar - self.headers = headers or self.headers - - def initialize(self, ctx): - super().initialize(ctx) - ctx.writer = csv.writer(ctx.file, delimiter=self.delimiter, quotechar=self.quotechar) - ctx.headers = copy(self.headers) - ctx.first = True - - def write(self, ctx, row): - if ctx.first: - ctx.headers = ctx.headers or row.keys() - ctx.writer.writerow(ctx.headers) - ctx.first = False - ctx.writer.writerow(row[header] for header in ctx.headers) - - def finalize(self, ctx): - del ctx.headers, ctx.writer, ctx.first - super().finalize(ctx) + def write(self, file, lineno, writer, headers, row): + if not lineno.value: + headers.value = headers.value or row.keys() + writer.writerow(headers.value) + writer.writerow(row[header] for header in headers.value) + lineno.value += 1 diff --git a/bonobo/io/file.py b/bonobo/io/file.py index 71be39e..2fd9997 100644 --- a/bonobo/io/file.py +++ b/bonobo/io/file.py @@ -1,63 +1,47 @@ -from bonobo.util.lifecycle import with_context +from bonobo.config import Configurable, Option +from bonobo.context import ContextProcessor +from bonobo.context.processors import contextual +from bonobo.util.objects import ValueHolder __all__ = [ - 'FileHandler', 'FileReader', 'FileWriter', ] -@with_context -class FileHandler: +@contextual +class FileHandler(Configurable): """ Abstract component factory for file-related components. """ - eol = '\n' - mode = None + path = Option(str, required=True) + eol = Option(str, default='\n') + mode = Option(str) - def __init__(self, path_or_buf, eol=None): - self.path_or_buf = path_or_buf - self.eol = eol or self.eol + @ContextProcessor + def file(self, context): + with self.open() as file: + yield file def open(self): - return open(self.path_or_buf, self.mode) - - def close(self, fp): - """ - :param file fp: - """ - fp.close() - - def initialize(self, ctx): - """ - Initialize a - :param ctx: - :return: - """ - - assert not hasattr(ctx, 'file'), 'A file pointer is already in the context... I do not know what to say...' - ctx.file = self.open() - - def finalize(self, ctx): - self.close(ctx.file) - del ctx.file + return open(self.path, self.mode) class Reader(FileHandler): - def __call__(self, ctx): - yield from self.read(ctx) + def __call__(self, *args): + yield from self.read(*args) - def read(self, ctx): + def read(self, *args): raise NotImplementedError('Abstract.') class Writer(FileHandler): - def __call__(self, ctx, row): - return self.write(ctx, row) + def __call__(self, *args): + return self.write(*args) - def write(self, ctx, row): + def write(self, *args): raise NotImplementedError('Abstract.') @@ -70,20 +54,21 @@ class FileReader(Reader): """ - mode = 'r' + mode = Option(str, default='r') - def read(self, ctx): + def read(self, file): """ - Write a row on the next line of file pointed by `ctx.file`. + Write a row on the next line of given file. Prefix is used for newlines. :param ctx: :param row: """ - for line in ctx.file: + for line in file: yield line.rstrip(self.eol) +@contextual class FileWriter(Writer): """ Component factory for file or file-like writers. @@ -93,13 +78,14 @@ class FileWriter(Writer): """ - mode = 'w+' + mode = Option(str, default='w+') - def initialize(self, ctx): - ctx.line = 0 - return super().initialize(ctx) + @ContextProcessor + def lineno(self, context, file): + lineno = ValueHolder(0, type=int) + yield lineno - def write(self, ctx, row): + def write(self, file, lineno, row): """ Write a row on the next line of opened file in context. @@ -107,12 +93,8 @@ class FileWriter(Writer): :param str row: :param str prefix: """ - self._write_line(ctx.file, (self.eol if ctx.line else '') + row) - ctx.line += 1 + self._write_line(file, (self.eol if lineno.value else '') + row) + lineno.value += 1 - def _write_line(self, fp, line): - return fp.write(line) - - def finalize(self, ctx): - del ctx.line - return super().finalize(ctx) + def _write_line(self, file, line): + return file.write(line) diff --git a/bonobo/io/json.py b/bonobo/io/json.py index f94105e..9c50932 100644 --- a/bonobo/io/json.py +++ b/bonobo/io/json.py @@ -1,5 +1,6 @@ import json +from bonobo import ContextProcessor, contextual from .file import FileWriter, FileReader __all__ = ['JsonWriter', ] @@ -10,25 +11,24 @@ class JsonHandler: class JsonReader(JsonHandler, FileReader): - def read(self, ctx): - for line in json.load(ctx.file): + def read(self, file): + for line in json.load(file): yield line +@contextual class JsonWriter(JsonHandler, FileWriter): - def initialize(self, ctx): - super().initialize(ctx) - ctx.file.write('[\n') + @ContextProcessor + def envelope(self, context, file, lineno): + file.write('[\n') + yield + file.write('\n]') - def write(self, ctx, row): + def write(self, file, lineno, row): """ Write a json row on the next line of file pointed by ctx.file. :param ctx: :param row: """ - return super().write(ctx, json.dumps(row)) - - def finalize(self, ctx): - ctx.file.write('\n]') - super().finalize(ctx) + return super().write(file, lineno, json.dumps(row)) diff --git a/bonobo/util/__init__.py b/bonobo/util/__init__.py index eece5b9..69fdebc 100644 --- a/bonobo/util/__init__.py +++ b/bonobo/util/__init__.py @@ -5,13 +5,10 @@ from pprint import pprint as _pprint import blessings -from .helpers import run, console_run, jupyter_run +from .helpers import console_run, jupyter_run from .tokens import NOT_MODIFIED -from .options import Configurable, Option __all__ = [ - 'Configurable', - 'Option', 'NOT_MODIFIED', 'console_run', 'jupyter_run', @@ -19,7 +16,6 @@ __all__ = [ 'log', 'noop', 'pprint', - 'run', 'tee', ] diff --git a/bonobo/util/helpers.py b/bonobo/util/helpers.py index 106b04e..c5d468a 100644 --- a/bonobo/util/helpers.py +++ b/bonobo/util/helpers.py @@ -1,25 +1,12 @@ -def run(*chain, plugins=None, strategy=None): - from bonobo import Graph, ThreadPoolExecutorStrategy - - if len(chain) == 1 and isinstance(chain[0], Graph): - graph = chain[0] - elif len(chain) >= 1: - graph = Graph() - graph.add_chain(*chain) - else: - raise RuntimeError('Empty chain.') - - executor = (strategy or ThreadPoolExecutorStrategy)() - return executor.execute(graph, plugins=plugins or []) - - def console_run(*chain, output=True, plugins=None, strategy=None): + from bonobo import run from bonobo.ext.console import ConsoleOutputPlugin return run(*chain, plugins=(plugins or []) + [ConsoleOutputPlugin()] if output else [], strategy=strategy) def jupyter_run(*chain, plugins=None, strategy=None): + from bonobo import run from bonobo.ext.jupyter import JupyterOutputPlugin return run(*chain, plugins=(plugins or []) + [JupyterOutputPlugin()], strategy=strategy) diff --git a/bonobo/util/objects.py b/bonobo/util/objects.py new file mode 100644 index 0000000..f6d0a93 --- /dev/null +++ b/bonobo/util/objects.py @@ -0,0 +1,22 @@ +def get_name(mixed): + try: + return mixed.__name__ + except AttributeError: + return type(mixed).__name__ + + +class Wrapper: + def __init__(self, wrapped): + self.wrapped = wrapped + + @property + def __name__(self): + return getattr(self.wrapped, '__name__', getattr(type(self.wrapped), '__name__', repr(self.wrapped))) + + name = __name__ + + +class ValueHolder: + def __init__(self, value, *, type=None): + self.value = value + self.type = type \ No newline at end of file diff --git a/bonobo/util/testing.py b/bonobo/util/testing.py index 30431c3..2e0d20a 100644 --- a/bonobo/util/testing.py +++ b/bonobo/util/testing.py @@ -1,9 +1,9 @@ from unittest.mock import MagicMock -from bonobo.core.contexts import ComponentExecutionContext +from bonobo.context.execution import NodeExecutionContext -class CapturingComponentExecutionContext(ComponentExecutionContext): - def __init__(self, component, parent): - super().__init__(component, parent) +class CapturingNodeExecutionContext(NodeExecutionContext): + def __init__(self, wrapped, parent): + super().__init__(wrapped, parent) self.send = MagicMock() diff --git a/bonobo/util/tokens.py b/bonobo/util/tokens.py index 98c0fae..5dae1de 100644 --- a/bonobo/util/tokens.py +++ b/bonobo/util/tokens.py @@ -11,8 +11,4 @@ class Token: BEGIN = Token('Begin') END = Token('End') -NEW = Token('New') -RUNNING = Token('Running') -TERMINATED = Token('Terminated') - NOT_MODIFIED = Token('NotModified') diff --git a/examples/basic_extract_transform_load_of_bags.py b/examples/basics_bags.py similarity index 61% rename from examples/basic_extract_transform_load_of_bags.py rename to examples/basics_bags.py index a5ac188..67f0087 100644 --- a/examples/basic_extract_transform_load_of_bags.py +++ b/examples/basics_bags.py @@ -3,8 +3,6 @@ from random import randint from bonobo import Bag from bonobo.core.graphs import Graph -from bonobo.core.strategies.executor import ThreadPoolExecutorStrategy -from bonobo.ext.console import ConsoleOutputPlugin def extract(): @@ -23,11 +21,10 @@ def load(topic: str, title: str, wait: int): print('{} ({}) wait={}'.format(title, topic, wait)) -Strategy = ThreadPoolExecutorStrategy +graph = Graph() +graph.add_chain(extract, transform, load) if __name__ == '__main__': - etl = Graph() - etl.add_chain(extract, transform, load) + from bonobo.util.helpers import run - s = Strategy() - s.execute(etl, plugins=[ConsoleOutputPlugin()]) + run(graph) diff --git a/examples/basic_extract_transform_load_of_dicts.py b/examples/basics_dicts.py similarity index 56% rename from examples/basic_extract_transform_load_of_dicts.py rename to examples/basics_dicts.py index aeb7f59..76e117d 100644 --- a/examples/basic_extract_transform_load_of_dicts.py +++ b/examples/basics_dicts.py @@ -2,8 +2,6 @@ import time from random import randint from bonobo.core.graphs import Graph -from bonobo.core.strategies.executor import ThreadPoolExecutorStrategy -from bonobo.ext.console import ConsoleOutputPlugin def extract(): @@ -25,11 +23,10 @@ def load(s): print(s) -Strategy = ThreadPoolExecutorStrategy +graph = Graph() +graph.add_chain(extract, transform, load) if __name__ == '__main__': - etl = Graph() - etl.add_chain(extract, transform, load) + from bonobo import run - s = Strategy() - s.execute(etl, plugins=[ConsoleOutputPlugin()]) + run(graph) diff --git a/examples/basics_file.py b/examples/basics_file.py new file mode 100644 index 0000000..a56e7e7 --- /dev/null +++ b/examples/basics_file.py @@ -0,0 +1,20 @@ +from bonobo import FileReader, Graph + + +def skip_comments(line): + if not line.startswith('#'): + yield line + + +graph = Graph( + FileReader(path='/etc/passwd'), + skip_comments, + lambda s: s.split(':'), + lambda l: l[0], + print, +) + +if __name__ == '__main__': + import bonobo + + bonobo.run(graph) diff --git a/examples/basics_file_csv.py b/examples/basics_file_csv.py new file mode 100644 index 0000000..c13bfcb --- /dev/null +++ b/examples/basics_file_csv.py @@ -0,0 +1,21 @@ +import os + +from bonobo import CsvReader, Graph + +__path__ = os.path.dirname(__file__) + + +def skip_comments(line): + if not line.startswith('#'): + yield line + + +graph = Graph( + CsvReader(path=os.path.join(__path__, 'datasets/coffeeshops.txt')), + print, +) + +if __name__ == '__main__': + import bonobo + + bonobo.run(graph) diff --git a/examples/basic_extract_transform_load_of_strings.py b/examples/basics_strings.py similarity index 51% rename from examples/basic_extract_transform_load_of_strings.py rename to examples/basics_strings.py index 830bab6..1147c7f 100644 --- a/examples/basic_extract_transform_load_of_strings.py +++ b/examples/basics_strings.py @@ -2,8 +2,6 @@ import time from random import randint from bonobo.core.graphs import Graph -from bonobo.core.strategies.executor import ThreadPoolExecutorStrategy -from bonobo.ext.console import ConsoleOutputPlugin def extract(): @@ -22,11 +20,10 @@ def load(s): print(s) -Strategy = ThreadPoolExecutorStrategy +graph = Graph() +graph.add_chain(extract, transform, load) if __name__ == '__main__': - etl = Graph() - etl.add_chain(extract, transform, load) + from bonobo import run - s = Strategy() - s.execute(etl, plugins=[ConsoleOutputPlugin()]) + run(graph) diff --git a/examples/datasets/cheap_coffeeshops_in_paris.txt b/examples/datasets/coffeeshops.txt similarity index 100% rename from examples/datasets/cheap_coffeeshops_in_paris.txt rename to examples/datasets/coffeeshops.txt diff --git a/examples/opendata_fablabs.py b/examples/opendata_fablabs.py index 6a5f082..7dd88d3 100644 --- a/examples/opendata_fablabs.py +++ b/examples/opendata_fablabs.py @@ -1,8 +1,9 @@ import json +import os from blessings import Terminal -from bonobo import console_run, tee, JsonWriter, Graph +from bonobo import tee, JsonWriter, Graph from bonobo.ext.opendatasoft import from_opendatasoft_api try: @@ -15,6 +16,7 @@ API_NETLOC = 'datanova.laposte.fr' ROWS = 100 t = Terminal() +__path__ = os.path.dirname(__file__) def _getlink(x): @@ -57,13 +59,17 @@ def display(row): graph = Graph( from_opendatasoft_api( - API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris' + API_DATASET, + netloc=API_NETLOC, + timezone='Europe/Paris' ), normalize, filter_france, tee(display), - JsonWriter('fablabs.json'), + JsonWriter(path=os.path.join(__path__, 'datasets/coffeeshops.txt')), ) if __name__ == '__main__': - console_run(graph, output=True) + import bonobo + + bonobo.run(graph) diff --git a/tests/context/test_processors.py b/tests/context/test_processors.py new file mode 100644 index 0000000..a0832ea --- /dev/null +++ b/tests/context/test_processors.py @@ -0,0 +1,55 @@ +from operator import attrgetter + +from bonobo import contextual, ContextProcessor +from bonobo.context.processors import get_context_processors + + +@contextual +class CP1: + @ContextProcessor + def c(self): + pass + + @ContextProcessor + def a(self): + pass + + @ContextProcessor + def b(self): + pass + + +@contextual +class CP2(CP1): + @ContextProcessor + def f(self): + pass + + @ContextProcessor + def e(self): + pass + + @ContextProcessor + def d(self): + pass + + +@contextual +class CP3(CP2): + @ContextProcessor + def c(self): + pass + + @ContextProcessor + def b(self): + pass + + +def get_all_processors_names(cls): + return list(map(attrgetter('__name__'), get_context_processors(cls))) + + +def test_inheritance_and_ordering(): + assert get_all_processors_names(CP1) == ['c', 'a', 'b'] + assert get_all_processors_names(CP2) == ['c', 'a', 'b', 'f', 'e', 'd'] + assert get_all_processors_names(CP3) == ['c', 'a', 'b', 'f', 'e', 'd', 'c', 'b'] diff --git a/tests/core/test_contexts.py b/tests/core/test_contexts.py index aac6c56..718d412 100644 --- a/tests/core/test_contexts.py +++ b/tests/core/test_contexts.py @@ -1,6 +1,5 @@ -from bonobo import Graph, NaiveStrategy, Bag -from bonobo.core.contexts import ExecutionContext -from bonobo.util.lifecycle import with_context +from bonobo import Graph, NaiveStrategy, Bag, contextual +from bonobo.context.execution import GraphExecutionContext from bonobo.util.tokens import BEGIN, END @@ -12,11 +11,16 @@ def square(i: int) -> int: return i**2 -@with_context -def push_result(ctx, i: int): - if not hasattr(ctx.parent, 'results'): - ctx.parent.results = [] - ctx.parent.results.append(i) +@contextual +def push_result(results, i: int): + results.append(i) + + +@push_result.__processors__.append +def results(f, context): + results = [] + yield results + context.parent.results = results chain = (generate_integers, square, push_result) @@ -25,8 +29,8 @@ chain = (generate_integers, square, push_result) def test_empty_execution_context(): graph = Graph() - ctx = ExecutionContext(graph) - assert not len(ctx.components) + ctx = GraphExecutionContext(graph) + assert not len(ctx.nodes) assert not len(ctx.plugins) assert not ctx.alive @@ -46,15 +50,19 @@ def test_simple_execution_context(): graph = Graph() graph.add_chain(*chain) - ctx = ExecutionContext(graph) - assert len(ctx.components) == len(chain) + ctx = GraphExecutionContext(graph) + assert len(ctx.nodes) == len(chain) assert not len(ctx.plugins) - for i, component in enumerate(chain): - assert ctx[i].component is component + for i, node in enumerate(chain): + assert ctx[i].wrapped is node assert not ctx.alive ctx.recv(BEGIN, Bag(), END) + assert not ctx.alive + + ctx.start() + assert ctx.alive diff --git a/tests/core/test_graphs.py b/tests/core/test_graphs.py index 47a7dbd..6dfd751 100644 --- a/tests/core/test_graphs.py +++ b/tests/core/test_graphs.py @@ -24,20 +24,20 @@ def test_graph_outputs_of(): def test_graph_add_component(): g = Graph() - assert len(g.components) == 0 + assert len(g.nodes) == 0 - g.add_component(identity) - assert len(g.components) == 1 + g.add_node(identity) + assert len(g.nodes) == 1 - g.add_component(identity) - assert len(g.components) == 2 + g.add_node(identity) + assert len(g.nodes) == 2 def test_graph_add_chain(): g = Graph() - assert len(g.components) == 0 + assert len(g.nodes) == 0 g.add_chain(identity, identity, identity) - assert len(g.components) == 3 + assert len(g.nodes) == 3 assert len(g.outputs_of(BEGIN)) == 1 diff --git a/tests/core/test_stats.py b/tests/core/test_statistics.py similarity index 53% rename from tests/core/test_stats.py rename to tests/core/test_statistics.py index dd0b002..d71e0dc 100644 --- a/tests/core/test_stats.py +++ b/tests/core/test_statistics.py @@ -1,8 +1,8 @@ -from bonobo.core.stats import WithStatistics +from bonobo.core.statistics import WithStatistics class MyThingWithStats(WithStatistics): - def get_stats(self, *args, **kwargs): + def get_statistics(self, *args, **kwargs): return ( ('foo', 42), ('bar', 69), @@ -11,4 +11,4 @@ class MyThingWithStats(WithStatistics): def test_with_statistics(): o = MyThingWithStats() - assert o.get_stats_as_string() == 'foo=42 bar=69' + assert o.get_statistics_as_string() == 'foo=42 bar=69' diff --git a/tests/io/test_csv.py b/tests/io/test_csv.py index aa3ca8f..d3000ec 100644 --- a/tests/io/test_csv.py +++ b/tests/io/test_csv.py @@ -1,21 +1,22 @@ import pytest from bonobo import Bag, CsvReader, CsvWriter -from bonobo.core.contexts import ComponentExecutionContext -from bonobo.util.testing import CapturingComponentExecutionContext +from bonobo.context.execution import NodeExecutionContext +from bonobo.util.testing import CapturingNodeExecutionContext from bonobo.util.tokens import BEGIN, END def test_write_csv_to_file(tmpdir): file = tmpdir.join('output.json') - writer = CsvWriter(str(file)) - context = ComponentExecutionContext(writer, None) + writer = CsvWriter(path=str(file)) + context = NodeExecutionContext(writer, None) - context.initialize() context.recv(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END) + + context.start() context.step() context.step() - context.finalize() + context.stop() assert file.read() == 'foo\nbar\nbaz\n' @@ -23,27 +24,18 @@ def test_write_csv_to_file(tmpdir): getattr(context, 'file') -def test_write_json_without_initializer_should_not_work(tmpdir): - file = tmpdir.join('output.json') - writer = CsvWriter(str(file)) - - context = ComponentExecutionContext(writer, None) - with pytest.raises(AttributeError): - writer(context, {'foo': 'bar'}) - - def test_read_csv_from_file(tmpdir): file = tmpdir.join('input.csv') file.write('a,b,c\na foo,b foo,c foo\na bar,b bar,c bar') - reader = CsvReader(str(file), delimiter=',') + reader = CsvReader(path=str(file), delimiter=',') - context = CapturingComponentExecutionContext(reader, None) + context = CapturingNodeExecutionContext(reader, None) - context.initialize() + context.start() context.recv(BEGIN, Bag(), END) context.step() - context.finalize() + context.stop() assert len(context.send.mock_calls) == 2 diff --git a/tests/io/test_file.py b/tests/io/test_file.py index 3816233..065fbca 100644 --- a/tests/io/test_file.py +++ b/tests/io/test_file.py @@ -1,8 +1,8 @@ import pytest from bonobo import FileWriter, Bag, FileReader -from bonobo.core.contexts import ComponentExecutionContext -from bonobo.util.testing import CapturingComponentExecutionContext +from bonobo.context.execution import NodeExecutionContext +from bonobo.util.testing import CapturingNodeExecutionContext from bonobo.util.tokens import BEGIN, END @@ -16,27 +16,24 @@ from bonobo.util.tokens import BEGIN, END def test_file_writer_in_context(tmpdir, lines, output): file = tmpdir.join('output.txt') - writer = FileWriter(str(file)) - context = ComponentExecutionContext(writer, None) + writer = FileWriter(path=str(file)) + context = NodeExecutionContext(writer, None) - context.initialize() + context.start() context.recv(BEGIN, *map(Bag, lines), END) for i in range(len(lines)): context.step() - context.finalize() + context.stop() assert file.read() == output - with pytest.raises(AttributeError): - getattr(context, 'file') - def test_file_writer_out_of_context(tmpdir): file = tmpdir.join('output.txt') - writer = FileWriter(str(file)) - fp = writer.open() - fp.write('Yosh!') - writer.close(fp) + writer = FileWriter(path=str(file)) + + with writer.open() as fp: + fp.write('Yosh!') assert file.read() == 'Yosh!' @@ -45,13 +42,13 @@ def test_file_reader_in_context(tmpdir): file = tmpdir.join('input.txt') file.write('Hello\nWorld\n') - reader = FileReader(str(file)) - context = CapturingComponentExecutionContext(reader, None) + reader = FileReader(path=str(file)) + context = CapturingNodeExecutionContext(reader, None) - context.initialize() + context.start() context.recv(BEGIN, Bag(), END) context.step() - context.finalize() + context.stop() assert len(context.send.mock_calls) == 2 diff --git a/tests/io/test_json.py b/tests/io/test_json.py index 7386872..dfee488 100644 --- a/tests/io/test_json.py +++ b/tests/io/test_json.py @@ -1,20 +1,21 @@ import pytest from bonobo import Bag, JsonWriter, JsonReader -from bonobo.core.contexts import ComponentExecutionContext -from bonobo.util.testing import CapturingComponentExecutionContext +from bonobo.context.execution import NodeExecutionContext +from bonobo.util.objects import ValueHolder +from bonobo.util.testing import CapturingNodeExecutionContext from bonobo.util.tokens import BEGIN, END def test_write_json_to_file(tmpdir): file = tmpdir.join('output.json') - writer = JsonWriter(str(file)) - context = ComponentExecutionContext(writer, None) + writer = JsonWriter(path=str(file)) + context = NodeExecutionContext(writer, None) - context.initialize() + context.start() context.recv(BEGIN, Bag({'foo': 'bar'}), END) context.step() - context.finalize() + context.stop() assert file.read() == '[\n{"foo": "bar"}\n]' @@ -25,26 +26,17 @@ def test_write_json_to_file(tmpdir): getattr(context, 'first') -def test_write_json_without_initializer_should_not_work(tmpdir): - file = tmpdir.join('output.json') - writer = JsonWriter(str(file)) - - context = ComponentExecutionContext(writer, None) - with pytest.raises(AttributeError): - writer(context, {'foo': 'bar'}) - - def test_read_json_from_file(tmpdir): file = tmpdir.join('input.json') file.write('[{"x": "foo"},{"x": "bar"}]') - reader = JsonReader(str(file)) + reader = JsonReader(path=str(file)) - context = CapturingComponentExecutionContext(reader, None) + context = CapturingNodeExecutionContext(reader, None) - context.initialize() + context.start() context.recv(BEGIN, Bag(), END) context.step() - context.finalize() + context.stop() assert len(context.send.mock_calls) == 2 diff --git a/tests/util/test_options.py b/tests/test_config.py similarity index 100% rename from tests/util/test_options.py rename to tests/test_config.py