diff --git a/bonobo/_api.py b/bonobo/_api.py index 0df8594..a960cbc 100644 --- a/bonobo/_api.py +++ b/bonobo/_api.py @@ -10,10 +10,10 @@ to another is maximal. from bonobo.execution.strategies import create_strategy from bonobo.nodes import * from bonobo.nodes import __all__ as _all_nodes +from bonobo.registry import create_reader, create_writer from bonobo.structs.graphs import Graph from bonobo.util.api import ApiHelper from bonobo.util.environ import parse_args, get_argument_parser -from bonobo.registry import create_reader, create_writer __all__ = [] @@ -73,7 +73,10 @@ def run(graph, *, plugins=None, services=None, strategy=None): import logging logging.getLogger().setLevel(settings.LOGGING_LEVEL.get()) strategy = create_strategy(strategy) - return strategy.execute(graph, plugins=plugins, services=services) + + from bonobo.util.errors import sweeten_errors + with sweeten_errors(): + return strategy.execute(graph, plugins=plugins, services=services) def _inspect_as_graph(graph): diff --git a/bonobo/commands/base.py b/bonobo/commands/base.py index da2967f..4f15daa 100644 --- a/bonobo/commands/base.py +++ b/bonobo/commands/base.py @@ -41,6 +41,7 @@ class BaseGraphCommand(BaseCommand): Base class for CLI commands that depends on a graph definition, either from a file or from a module. """ + required = True handler = None diff --git a/bonobo/examples/__init__.py b/bonobo/examples/__init__.py index dba9989..eaea6f0 100644 --- a/bonobo/examples/__init__.py +++ b/bonobo/examples/__init__.py @@ -1,5 +1,8 @@ +import os + import bonobo from bonobo.execution.strategies import STRATEGIES, DEFAULT_STRATEGY +from bonobo.util.statistics import Timer def get_argument_parser(parser=None): @@ -39,3 +42,45 @@ def get_graph_options(options): '_limit': (bonobo.Limit(_limit), ) if _limit else (), '_print': (bonobo.PrettyPrinter(), ) if _print else (), } + + +def run(get_graph, get_services, *, parser=None): + parser = parser or get_argument_parser() + + with bonobo.parse_args(parser) as options: + with Timer() as timer: + print( + 'Options:', ' '.join( + '{}={}'.format(k, v) + for k, v in sorted(options.items()) + ) + ) + retval = bonobo.run( + get_graph(**get_graph_options(options)), + services=get_services(), + strategy=options['strategy'], + ) + print('Execution time:', timer) + print('Return value:', retval) + print('XStatus:', retval.xstatus) + return retval.xstatus + + +def get_minor_version(): + return '.'.join(bonobo.__version__.split('.')[:2]) + + +def get_datasets_dir(*dirs): + home_dir = os.path.expanduser('~') + target_dir = os.path.join( + home_dir, '.cache/bonobo', get_minor_version(), *dirs + ) + os.makedirs(target_dir, exist_ok=True) + return target_dir + + +def get_services(): + return { + 'fs': bonobo.open_fs(get_datasets_dir('datasets')), + 'fs.static': bonobo.open_examples_fs('datasets', 'static'), + } diff --git a/bonobo/examples/coffeeshops.csv b/bonobo/examples/coffeeshops.csv deleted file mode 100644 index e69de29..0000000 diff --git a/bonobo/examples/datasets/__main__.py b/bonobo/examples/datasets/__main__.py index a62ce33..e330dc4 100644 --- a/bonobo/examples/datasets/__main__.py +++ b/bonobo/examples/datasets/__main__.py @@ -4,7 +4,7 @@ import bonobo from bonobo import examples from bonobo.examples.datasets.coffeeshops import get_graph as get_coffeeshops_graph from bonobo.examples.datasets.fablabs import get_graph as get_fablabs_graph -from bonobo.examples.datasets.services import get_services, get_datasets_dir, get_minor_version +from bonobo.examples import get_minor_version, get_datasets_dir, get_services graph_factories = { 'coffeeshops': get_coffeeshops_graph, diff --git a/bonobo/examples/datasets/coffeeshops.py b/bonobo/examples/datasets/coffeeshops.py index 714b2a1..067f593 100644 --- a/bonobo/examples/datasets/coffeeshops.py +++ b/bonobo/examples/datasets/coffeeshops.py @@ -1,10 +1,12 @@ """ """ +import sys + import bonobo from bonobo import examples from bonobo.contrib.opendatasoft import OpenDataSoftAPI as ODSReader -from bonobo.examples.datasets.services import get_services +from bonobo.examples import get_services def get_graph(graph=None, *, _limit=(), _print=()): @@ -58,10 +60,4 @@ def get_graph(graph=None, *, _limit=(), _print=()): if __name__ == '__main__': - parser = examples.get_argument_parser() - - with bonobo.parse_args(parser) as options: - bonobo.run( - get_graph(**examples.get_graph_options(options)), - services=get_services() - ) + sys.exit(examples.run(get_graph, get_services)) diff --git a/bonobo/examples/datasets/fablabs.py b/bonobo/examples/datasets/fablabs.py index 807440b..24aaa71 100644 --- a/bonobo/examples/datasets/fablabs.py +++ b/bonobo/examples/datasets/fablabs.py @@ -20,8 +20,7 @@ import sys import bonobo from bonobo import examples from bonobo.contrib.opendatasoft import OpenDataSoftAPI -from bonobo.examples.datasets.services import get_services -from bonobo.util.statistics import Timer +from bonobo.examples import get_services try: import pycountry @@ -65,23 +64,4 @@ def get_graph(graph=None, *, _limit=(), _print=()): if __name__ == '__main__': - parser = examples.get_argument_parser() - - with bonobo.parse_args(parser) as options: - with Timer() as timer: - print( - 'Options:', ' '.join( - '{}={}'.format(k, v) - for k, v in sorted(options.items()) - ) - ) - retval = bonobo.run( - get_graph(**examples.get_graph_options(options)), - services=get_services(), - strategy=options['strategy'], - ) - print('Execution time:', timer) - print('Return value:', retval) - print('XStatus:', retval.xstatus) - if retval.xstatus: - sys.exit(retval.xstatus) + sys.exit(examples.run(get_graph, get_services)) diff --git a/bonobo/examples/datasets/services.py b/bonobo/examples/datasets/services.py deleted file mode 100644 index 6412156..0000000 --- a/bonobo/examples/datasets/services.py +++ /dev/null @@ -1,20 +0,0 @@ -import os - -import bonobo - - -def get_minor_version(): - return '.'.join(bonobo.__version__.split('.')[:2]) - - -def get_datasets_dir(*dirs): - home_dir = os.path.expanduser('~') - target_dir = os.path.join( - home_dir, '.cache/bonobo', get_minor_version(), *dirs - ) - os.makedirs(target_dir, exist_ok=True) - return target_dir - - -def get_services(): - return {'fs': bonobo.open_fs(get_datasets_dir('datasets'))} diff --git a/bonobo/examples/empty.py b/bonobo/examples/empty.py new file mode 100644 index 0000000..e144923 --- /dev/null +++ b/bonobo/examples/empty.py @@ -0,0 +1,24 @@ +import bonobo +import datetime +import time + + +def extract(): + """Placeholder, change, rename, remove... """ + for x in range(60): + if x: + time.sleep(1) + yield datetime.datetime.now() + + +def get_graph(): + graph = bonobo.Graph() + graph.add_chain() + + return graph + + +if __name__ == '__main__': + parser = bonobo.get_argument_parser() + with bonobo.parse_args(parser): + bonobo.run(get_graph()) diff --git a/bonobo/examples/files/_services.py b/bonobo/examples/files/_services.py deleted file mode 100644 index 825e39d..0000000 --- a/bonobo/examples/files/_services.py +++ /dev/null @@ -1,8 +0,0 @@ -from bonobo import get_examples_path, open_fs - - -def get_services(): - return { - 'fs': open_fs(get_examples_path()), - 'fs.output': open_fs(), - } diff --git a/bonobo/examples/files/csv_handlers.py b/bonobo/examples/files/csv_handlers.py index acc6189..369b680 100644 --- a/bonobo/examples/files/csv_handlers.py +++ b/bonobo/examples/files/csv_handlers.py @@ -1,10 +1,13 @@ +import sys + import bonobo -from bonobo.examples.files._services import get_services +from bonobo import examples +from bonobo.examples.files.services import get_services def get_graph(*, _limit=None, _print=False): return bonobo.Graph( - bonobo.CsvReader('datasets/coffeeshops.txt'), + bonobo.CsvReader('coffeeshops.csv'), *((bonobo.Limit(_limit), ) if _limit else ()), *((bonobo.PrettyPrinter(), ) if _print else ()), bonobo.CsvWriter('coffeeshops.csv', fs='fs.output') @@ -12,25 +15,4 @@ def get_graph(*, _limit=None, _print=False): if __name__ == '__main__': - parser = bonobo.get_argument_parser() - - parser.add_argument( - '--limit', - '-l', - type=int, - default=None, - help='If set, limits the number of processed lines.' - ) - parser.add_argument( - '--print', - '-p', - action='store_true', - default=False, - help='If set, pretty prints before writing to output file.' - ) - - with bonobo.parse_args(parser) as options: - bonobo.run( - get_graph(_limit=options['limit'], _print=options['print']), - services=get_services() - ) + sys.exit(examples.run(get_graph, get_services)) diff --git a/bonobo/examples/files/json_handlers.py b/bonobo/examples/files/json_handlers.py index 819a8fd..50908d3 100644 --- a/bonobo/examples/files/json_handlers.py +++ b/bonobo/examples/files/json_handlers.py @@ -1,12 +1,15 @@ +import sys + import bonobo -from bonobo.examples.files._services import get_services +from bonobo import examples +from bonobo.examples.files.services import get_services def get_graph(*, _limit=None, _print=False): graph = bonobo.Graph() trunk = graph.add_chain( - bonobo.JsonReader('datasets/theaters.json'), + bonobo.JsonReader('theaters.json', fs='fs.static'), *((bonobo.Limit(_limit), ) if _limit else ()), ) @@ -14,11 +17,11 @@ def get_graph(*, _limit=None, _print=False): graph.add_chain(bonobo.PrettyPrinter(), _input=trunk.output) graph.add_chain( - bonobo.JsonWriter('theaters.json', fs='fs.output'), + bonobo.JsonWriter('theaters.output.json', fs='fs.output'), _input=trunk.output ) graph.add_chain( - bonobo.LdjsonWriter('theaters.ldjson', fs='fs.output'), + bonobo.LdjsonWriter('theaters.output.ldjson', fs='fs.output'), _input=trunk.output ) @@ -26,25 +29,4 @@ def get_graph(*, _limit=None, _print=False): if __name__ == '__main__': - parser = bonobo.get_argument_parser() - - parser.add_argument( - '--limit', - '-l', - type=int, - default=None, - help='If set, limits the number of processed lines.' - ) - parser.add_argument( - '--print', - '-p', - action='store_true', - default=False, - help='If set, pretty prints before writing to output file.' - ) - - with bonobo.parse_args(parser) as options: - bonobo.run( - get_graph(_limit=options['limit'], _print=options['print']), - services=get_services() - ) + sys.exit(examples.run(get_graph, get_services)) diff --git a/bonobo/examples/files/pickle_handlers.py b/bonobo/examples/files/pickle_handlers.py index 0b90955..4557686 100644 --- a/bonobo/examples/files/pickle_handlers.py +++ b/bonobo/examples/files/pickle_handlers.py @@ -27,6 +27,8 @@ messages categorized as spam, and (3) prints the output. ''' +import sys + from fs.tarfs import TarFS import bonobo @@ -61,17 +63,11 @@ def get_graph(*, _limit=(), _print=()): def get_services(): - from ._services import get_services return { - **get_services(), 'fs': - TarFS(bonobo.get_examples_path('datasets/spam.tgz')) + **examples.get_services(), 'fs': + TarFS(bonobo.get_examples_path('datasets', 'static', 'spam.tgz')) } if __name__ == '__main__': - parser = examples.get_argument_parser() - with bonobo.parse_args(parser) as options: - bonobo.run( - get_graph(**examples.get_graph_options(options)), - services=get_services() - ) + sys.exit(examples.run(get_graph, get_services)) diff --git a/bonobo/examples/files/services.py b/bonobo/examples/files/services.py new file mode 100644 index 0000000..38b61b9 --- /dev/null +++ b/bonobo/examples/files/services.py @@ -0,0 +1,8 @@ +from bonobo import open_fs, examples + + +def get_services(): + return { + **examples.get_services(), + 'fs.output': open_fs(), + } diff --git a/bonobo/examples/files/text_handlers.py b/bonobo/examples/files/text_handlers.py index 2e91227..be37c4a 100644 --- a/bonobo/examples/files/text_handlers.py +++ b/bonobo/examples/files/text_handlers.py @@ -1,6 +1,8 @@ +import sys + import bonobo from bonobo import examples -from bonobo.examples.files._services import get_services +from bonobo.examples.files.services import get_services def skip_comments(line): @@ -11,7 +13,7 @@ def skip_comments(line): def get_graph(*, _limit=(), _print=()): return bonobo.Graph( - bonobo.FileReader('datasets/passwd.txt'), + bonobo.FileReader('passwd.txt', fs='fs.static'), skip_comments, *_limit, lambda s: s.split(':')[0], @@ -21,9 +23,4 @@ def get_graph(*, _limit=(), _print=()): if __name__ == '__main__': - parser = examples.get_argument_parser() - with bonobo.parse_args(parser) as options: - bonobo.run( - get_graph(**examples.get_graph_options(options)), - services=get_services() - ) + sys.exit(examples.run(get_graph, get_services)) diff --git a/bonobo/examples/tutorials/__init__.py b/bonobo/examples/tutorials/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/bonobo/examples/tutorials/_services.py b/bonobo/examples/tutorials/_services.py deleted file mode 100644 index 25d783d..0000000 --- a/bonobo/examples/tutorials/_services.py +++ /dev/null @@ -1,5 +0,0 @@ -from bonobo import open_examples_fs - - -def get_services(): - return {'fs': open_examples_fs('datasets')} diff --git a/bonobo/examples/tutorials/tut01e01.py b/bonobo/examples/tutorials/tut01e01.py deleted file mode 100644 index c524039..0000000 --- a/bonobo/examples/tutorials/tut01e01.py +++ /dev/null @@ -1,23 +0,0 @@ -import bonobo - - -def extract(): - yield 'foo' - yield 'bar' - yield 'baz' - - -def transform(x): - return x.upper() - - -def load(x): - print(x) - - -graph = bonobo.Graph(extract, transform, load) - -graph.__doc__ = 'hello' - -if __name__ == '__main__': - bonobo.run(graph) diff --git a/bonobo/examples/tutorials/tut01e02.py b/bonobo/examples/tutorials/tut01e02.py deleted file mode 100644 index 78b7f43..0000000 --- a/bonobo/examples/tutorials/tut01e02.py +++ /dev/null @@ -1,14 +0,0 @@ -import bonobo - -graph = bonobo.Graph( - [ - 'foo', - 'bar', - 'baz', - ], - str.upper, - print, -) - -if __name__ == '__main__': - bonobo.run(graph) diff --git a/bonobo/examples/tutorials/tut02e01_read.py b/bonobo/examples/tutorials/tut02e01_read.py deleted file mode 100644 index 362051a..0000000 --- a/bonobo/examples/tutorials/tut02e01_read.py +++ /dev/null @@ -1,14 +0,0 @@ -import bonobo - -graph = bonobo.Graph( - bonobo.FileReader('coffeeshops.txt'), - print, -) - - -def get_services(): - return {'fs': bonobo.open_examples_fs('datasets')} - - -if __name__ == '__main__': - bonobo.run(graph, services=get_services()) diff --git a/bonobo/examples/tutorials/tut02e02_write.py b/bonobo/examples/tutorials/tut02e02_write.py deleted file mode 100644 index a33a11b..0000000 --- a/bonobo/examples/tutorials/tut02e02_write.py +++ /dev/null @@ -1,23 +0,0 @@ -import bonobo - - -def split_one(line): - return dict(zip(("name", "address"), line.split(', ', 1))) - - -graph = bonobo.Graph( - bonobo.FileReader('coffeeshops.txt'), - split_one, - bonobo.JsonWriter('coffeeshops.json', fs='fs.output'), -) - - -def get_services(): - return { - 'fs': bonobo.open_examples_fs('datasets'), - 'fs.output': bonobo.open_fs(), - } - - -if __name__ == '__main__': - bonobo.run(graph, services=get_services()) diff --git a/bonobo/examples/tutorials/tut02e03_writeasmap.py b/bonobo/examples/tutorials/tut02e03_writeasmap.py deleted file mode 100644 index afc251e..0000000 --- a/bonobo/examples/tutorials/tut02e03_writeasmap.py +++ /dev/null @@ -1,36 +0,0 @@ -import json - -import bonobo - - -def split_one_to_map(line): - k, v = line.split(', ', 1) - return {k: v} - - -class MyJsonWriter(bonobo.JsonWriter): - prefix, suffix = '{', '}' - - def write(self, fs, file, lineno, **row): - return bonobo.FileWriter.write( - self, fs, file, lineno, - json.dumps(row)[1:-1] - ) - - -graph = bonobo.Graph( - bonobo.FileReader('coffeeshops.txt'), - split_one_to_map, - MyJsonWriter('coffeeshops.json', fs='fs.output'), -) - - -def get_services(): - return { - 'fs': bonobo.open_examples_fs('datasets'), - 'fs.output': bonobo.open_fs(), - } - - -if __name__ == '__main__': - bonobo.run(graph, services=get_services()) diff --git a/bonobo/examples/tutorials/tutorial_basics_firststeps.py b/bonobo/examples/tutorials/tutorial_basics_firststeps.py deleted file mode 100644 index d024287..0000000 --- a/bonobo/examples/tutorials/tutorial_basics_firststeps.py +++ /dev/null @@ -1,25 +0,0 @@ -import bonobo - - -def generate_data(): - yield 'foo' - yield 'bar' - yield 'baz' - - -def uppercase(x: str): - return x.upper() - - -def output(x: str): - print(x) - - -graph = bonobo.Graph( - generate_data, - uppercase, - output, -) - -if __name__ == '__main__': - bonobo.run(graph) diff --git a/bonobo/examples/tutorials/tutorial_basics_summary.py b/bonobo/examples/tutorials/tutorial_basics_summary.py deleted file mode 100644 index a75e0f5..0000000 --- a/bonobo/examples/tutorials/tutorial_basics_summary.py +++ /dev/null @@ -1,11 +0,0 @@ -import bonobo - -# Represent our data processor as a simple directed graph of callables. -graph = bonobo.Graph( - ['foo', 'bar', 'baz'], - str.upper, - print, -) - -if __name__ == '__main__': - bonobo.run(graph) diff --git a/bonobo/execution/strategies/__init__.py b/bonobo/execution/strategies/__init__.py index 638826f..318550e 100644 --- a/bonobo/execution/strategies/__init__.py +++ b/bonobo/execution/strategies/__init__.py @@ -40,7 +40,7 @@ def create_strategy(name=None): if name is None: name = DEFAULT_STRATEGY - logging.debug('Creating strategy {}...'.format(name)) + logging.debug('Creating execution strategy {!r}...'.format(name)) try: factory = STRATEGIES[name] diff --git a/bonobo/execution/strategies/base.py b/bonobo/execution/strategies/base.py index 0a8d2a5..69e1d65 100644 --- a/bonobo/execution/strategies/base.py +++ b/bonobo/execution/strategies/base.py @@ -12,6 +12,8 @@ class Strategy: self.GraphExecutionContextType = GraphExecutionContextType or self.GraphExecutionContextType def create_graph_execution_context(self, graph, *args, GraphExecutionContextType=None, **kwargs): + if not len(graph): + raise ValueError('You provided an empty graph, which does not really make sense. Please add some nodes.') return (GraphExecutionContextType or self.GraphExecutionContextType)(graph, *args, **kwargs) def execute(self, graph, *args, **kwargs): diff --git a/bonobo/util/errors.py b/bonobo/util/errors.py new file mode 100644 index 0000000..a24ce94 --- /dev/null +++ b/bonobo/util/errors.py @@ -0,0 +1,68 @@ +import logging +import re +from contextlib import contextmanager +from sys import exc_info + +from mondrian import term + +logger = logging.getLogger(__name__) + + +@contextmanager +def sweeten_errors(): + try: + yield + except Exception as exc: + SPACES = 2 + w = term.white + prefix = w('║' + ' ' * (SPACES - 1)) + suffix = w(' ' * (SPACES - 1) + '║') + + pre_re = re.compile('([^`]*)`([^`]*)`([^`]*)') + + def format_arg(arg): + length = len(pre_re.sub('\\1\\2\\3', arg)) + + arg = pre_re.sub(w('\\1') + term.bold('\\2') + w('\\3'), arg) + arg = re.sub('^ \$ (.*)', term.lightblack(' $ ') + term.reset('\\1'), arg) + + return (arg, length) + + def f(*args): + return ''.join(args) + + term_width, term_height = term.get_size() + line_length = min(80, term_width) + for arg in exc.args: + line_length = max(min(line_length, len(arg) + 2 * SPACES), 120) + + print(f(w('╔' + '═' * (line_length - 2) + '╗'))) + for i, arg in enumerate(exc.args): + + if i == 1: + print(f( + prefix, + ' ' * (line_length - 2 * SPACES), + suffix, + )) + + arg_formatted, arg_length = format_arg(arg) + if not i: + # first line + print( + f( + prefix, + term.red_bg(term.bold(' ' + type(exc).__name__ + ' ')), + ' ', + w(arg_formatted), + ' ' * (line_length - (arg_length + 3 + len(type(exc).__name__) + 2 * SPACES)), + suffix, + ) + ) + else: + # other lines + print(f(prefix, arg_formatted + ' ' * (line_length - arg_length - 2 * SPACES), suffix)) + + print(f(w('╚' + '═' * (line_length - 2) + '╝'))) + + logging.getLogger().debug('This error was caused by the following exception chain.', exc_info=exc_info())