feat: nicer errors, cleanup examples.
This commit is contained in:
@ -10,10 +10,10 @@ to another is maximal.
|
||||
from bonobo.execution.strategies import create_strategy
|
||||
from bonobo.nodes import *
|
||||
from bonobo.nodes import __all__ as _all_nodes
|
||||
from bonobo.registry import create_reader, create_writer
|
||||
from bonobo.structs.graphs import Graph
|
||||
from bonobo.util.api import ApiHelper
|
||||
from bonobo.util.environ import parse_args, get_argument_parser
|
||||
from bonobo.registry import create_reader, create_writer
|
||||
|
||||
__all__ = []
|
||||
|
||||
@ -73,6 +73,9 @@ def run(graph, *, plugins=None, services=None, strategy=None):
|
||||
import logging
|
||||
logging.getLogger().setLevel(settings.LOGGING_LEVEL.get())
|
||||
strategy = create_strategy(strategy)
|
||||
|
||||
from bonobo.util.errors import sweeten_errors
|
||||
with sweeten_errors():
|
||||
return strategy.execute(graph, plugins=plugins, services=services)
|
||||
|
||||
|
||||
|
||||
@ -41,6 +41,7 @@ class BaseGraphCommand(BaseCommand):
|
||||
Base class for CLI commands that depends on a graph definition, either from a file or from a module.
|
||||
|
||||
"""
|
||||
|
||||
required = True
|
||||
handler = None
|
||||
|
||||
|
||||
@ -1,5 +1,8 @@
|
||||
import os
|
||||
|
||||
import bonobo
|
||||
from bonobo.execution.strategies import STRATEGIES, DEFAULT_STRATEGY
|
||||
from bonobo.util.statistics import Timer
|
||||
|
||||
|
||||
def get_argument_parser(parser=None):
|
||||
@ -39,3 +42,45 @@ def get_graph_options(options):
|
||||
'_limit': (bonobo.Limit(_limit), ) if _limit else (),
|
||||
'_print': (bonobo.PrettyPrinter(), ) if _print else (),
|
||||
}
|
||||
|
||||
|
||||
def run(get_graph, get_services, *, parser=None):
|
||||
parser = parser or get_argument_parser()
|
||||
|
||||
with bonobo.parse_args(parser) as options:
|
||||
with Timer() as timer:
|
||||
print(
|
||||
'Options:', ' '.join(
|
||||
'{}={}'.format(k, v)
|
||||
for k, v in sorted(options.items())
|
||||
)
|
||||
)
|
||||
retval = bonobo.run(
|
||||
get_graph(**get_graph_options(options)),
|
||||
services=get_services(),
|
||||
strategy=options['strategy'],
|
||||
)
|
||||
print('Execution time:', timer)
|
||||
print('Return value:', retval)
|
||||
print('XStatus:', retval.xstatus)
|
||||
return retval.xstatus
|
||||
|
||||
|
||||
def get_minor_version():
|
||||
return '.'.join(bonobo.__version__.split('.')[:2])
|
||||
|
||||
|
||||
def get_datasets_dir(*dirs):
|
||||
home_dir = os.path.expanduser('~')
|
||||
target_dir = os.path.join(
|
||||
home_dir, '.cache/bonobo', get_minor_version(), *dirs
|
||||
)
|
||||
os.makedirs(target_dir, exist_ok=True)
|
||||
return target_dir
|
||||
|
||||
|
||||
def get_services():
|
||||
return {
|
||||
'fs': bonobo.open_fs(get_datasets_dir('datasets')),
|
||||
'fs.static': bonobo.open_examples_fs('datasets', 'static'),
|
||||
}
|
||||
|
||||
@ -4,7 +4,7 @@ import bonobo
|
||||
from bonobo import examples
|
||||
from bonobo.examples.datasets.coffeeshops import get_graph as get_coffeeshops_graph
|
||||
from bonobo.examples.datasets.fablabs import get_graph as get_fablabs_graph
|
||||
from bonobo.examples.datasets.services import get_services, get_datasets_dir, get_minor_version
|
||||
from bonobo.examples import get_minor_version, get_datasets_dir, get_services
|
||||
|
||||
graph_factories = {
|
||||
'coffeeshops': get_coffeeshops_graph,
|
||||
|
||||
@ -1,10 +1,12 @@
|
||||
"""
|
||||
|
||||
"""
|
||||
import sys
|
||||
|
||||
import bonobo
|
||||
from bonobo import examples
|
||||
from bonobo.contrib.opendatasoft import OpenDataSoftAPI as ODSReader
|
||||
from bonobo.examples.datasets.services import get_services
|
||||
from bonobo.examples import get_services
|
||||
|
||||
|
||||
def get_graph(graph=None, *, _limit=(), _print=()):
|
||||
@ -58,10 +60,4 @@ def get_graph(graph=None, *, _limit=(), _print=()):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = examples.get_argument_parser()
|
||||
|
||||
with bonobo.parse_args(parser) as options:
|
||||
bonobo.run(
|
||||
get_graph(**examples.get_graph_options(options)),
|
||||
services=get_services()
|
||||
)
|
||||
sys.exit(examples.run(get_graph, get_services))
|
||||
|
||||
@ -20,8 +20,7 @@ import sys
|
||||
import bonobo
|
||||
from bonobo import examples
|
||||
from bonobo.contrib.opendatasoft import OpenDataSoftAPI
|
||||
from bonobo.examples.datasets.services import get_services
|
||||
from bonobo.util.statistics import Timer
|
||||
from bonobo.examples import get_services
|
||||
|
||||
try:
|
||||
import pycountry
|
||||
@ -65,23 +64,4 @@ def get_graph(graph=None, *, _limit=(), _print=()):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = examples.get_argument_parser()
|
||||
|
||||
with bonobo.parse_args(parser) as options:
|
||||
with Timer() as timer:
|
||||
print(
|
||||
'Options:', ' '.join(
|
||||
'{}={}'.format(k, v)
|
||||
for k, v in sorted(options.items())
|
||||
)
|
||||
)
|
||||
retval = bonobo.run(
|
||||
get_graph(**examples.get_graph_options(options)),
|
||||
services=get_services(),
|
||||
strategy=options['strategy'],
|
||||
)
|
||||
print('Execution time:', timer)
|
||||
print('Return value:', retval)
|
||||
print('XStatus:', retval.xstatus)
|
||||
if retval.xstatus:
|
||||
sys.exit(retval.xstatus)
|
||||
sys.exit(examples.run(get_graph, get_services))
|
||||
|
||||
@ -1,20 +0,0 @@
|
||||
import os
|
||||
|
||||
import bonobo
|
||||
|
||||
|
||||
def get_minor_version():
|
||||
return '.'.join(bonobo.__version__.split('.')[:2])
|
||||
|
||||
|
||||
def get_datasets_dir(*dirs):
|
||||
home_dir = os.path.expanduser('~')
|
||||
target_dir = os.path.join(
|
||||
home_dir, '.cache/bonobo', get_minor_version(), *dirs
|
||||
)
|
||||
os.makedirs(target_dir, exist_ok=True)
|
||||
return target_dir
|
||||
|
||||
|
||||
def get_services():
|
||||
return {'fs': bonobo.open_fs(get_datasets_dir('datasets'))}
|
||||
24
bonobo/examples/empty.py
Normal file
24
bonobo/examples/empty.py
Normal file
@ -0,0 +1,24 @@
|
||||
import bonobo
|
||||
import datetime
|
||||
import time
|
||||
|
||||
|
||||
def extract():
|
||||
"""Placeholder, change, rename, remove... """
|
||||
for x in range(60):
|
||||
if x:
|
||||
time.sleep(1)
|
||||
yield datetime.datetime.now()
|
||||
|
||||
|
||||
def get_graph():
|
||||
graph = bonobo.Graph()
|
||||
graph.add_chain()
|
||||
|
||||
return graph
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = bonobo.get_argument_parser()
|
||||
with bonobo.parse_args(parser):
|
||||
bonobo.run(get_graph())
|
||||
@ -1,8 +0,0 @@
|
||||
from bonobo import get_examples_path, open_fs
|
||||
|
||||
|
||||
def get_services():
|
||||
return {
|
||||
'fs': open_fs(get_examples_path()),
|
||||
'fs.output': open_fs(),
|
||||
}
|
||||
@ -1,10 +1,13 @@
|
||||
import sys
|
||||
|
||||
import bonobo
|
||||
from bonobo.examples.files._services import get_services
|
||||
from bonobo import examples
|
||||
from bonobo.examples.files.services import get_services
|
||||
|
||||
|
||||
def get_graph(*, _limit=None, _print=False):
|
||||
return bonobo.Graph(
|
||||
bonobo.CsvReader('datasets/coffeeshops.txt'),
|
||||
bonobo.CsvReader('coffeeshops.csv'),
|
||||
*((bonobo.Limit(_limit), ) if _limit else ()),
|
||||
*((bonobo.PrettyPrinter(), ) if _print else ()),
|
||||
bonobo.CsvWriter('coffeeshops.csv', fs='fs.output')
|
||||
@ -12,25 +15,4 @@ def get_graph(*, _limit=None, _print=False):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = bonobo.get_argument_parser()
|
||||
|
||||
parser.add_argument(
|
||||
'--limit',
|
||||
'-l',
|
||||
type=int,
|
||||
default=None,
|
||||
help='If set, limits the number of processed lines.'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--print',
|
||||
'-p',
|
||||
action='store_true',
|
||||
default=False,
|
||||
help='If set, pretty prints before writing to output file.'
|
||||
)
|
||||
|
||||
with bonobo.parse_args(parser) as options:
|
||||
bonobo.run(
|
||||
get_graph(_limit=options['limit'], _print=options['print']),
|
||||
services=get_services()
|
||||
)
|
||||
sys.exit(examples.run(get_graph, get_services))
|
||||
|
||||
@ -1,12 +1,15 @@
|
||||
import sys
|
||||
|
||||
import bonobo
|
||||
from bonobo.examples.files._services import get_services
|
||||
from bonobo import examples
|
||||
from bonobo.examples.files.services import get_services
|
||||
|
||||
|
||||
def get_graph(*, _limit=None, _print=False):
|
||||
graph = bonobo.Graph()
|
||||
|
||||
trunk = graph.add_chain(
|
||||
bonobo.JsonReader('datasets/theaters.json'),
|
||||
bonobo.JsonReader('theaters.json', fs='fs.static'),
|
||||
*((bonobo.Limit(_limit), ) if _limit else ()),
|
||||
)
|
||||
|
||||
@ -14,11 +17,11 @@ def get_graph(*, _limit=None, _print=False):
|
||||
graph.add_chain(bonobo.PrettyPrinter(), _input=trunk.output)
|
||||
|
||||
graph.add_chain(
|
||||
bonobo.JsonWriter('theaters.json', fs='fs.output'),
|
||||
bonobo.JsonWriter('theaters.output.json', fs='fs.output'),
|
||||
_input=trunk.output
|
||||
)
|
||||
graph.add_chain(
|
||||
bonobo.LdjsonWriter('theaters.ldjson', fs='fs.output'),
|
||||
bonobo.LdjsonWriter('theaters.output.ldjson', fs='fs.output'),
|
||||
_input=trunk.output
|
||||
)
|
||||
|
||||
@ -26,25 +29,4 @@ def get_graph(*, _limit=None, _print=False):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = bonobo.get_argument_parser()
|
||||
|
||||
parser.add_argument(
|
||||
'--limit',
|
||||
'-l',
|
||||
type=int,
|
||||
default=None,
|
||||
help='If set, limits the number of processed lines.'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--print',
|
||||
'-p',
|
||||
action='store_true',
|
||||
default=False,
|
||||
help='If set, pretty prints before writing to output file.'
|
||||
)
|
||||
|
||||
with bonobo.parse_args(parser) as options:
|
||||
bonobo.run(
|
||||
get_graph(_limit=options['limit'], _print=options['print']),
|
||||
services=get_services()
|
||||
)
|
||||
sys.exit(examples.run(get_graph, get_services))
|
||||
|
||||
@ -27,6 +27,8 @@ messages categorized as spam, and (3) prints the output.
|
||||
|
||||
'''
|
||||
|
||||
import sys
|
||||
|
||||
from fs.tarfs import TarFS
|
||||
|
||||
import bonobo
|
||||
@ -61,17 +63,11 @@ def get_graph(*, _limit=(), _print=()):
|
||||
|
||||
|
||||
def get_services():
|
||||
from ._services import get_services
|
||||
return {
|
||||
**get_services(), 'fs':
|
||||
TarFS(bonobo.get_examples_path('datasets/spam.tgz'))
|
||||
**examples.get_services(), 'fs':
|
||||
TarFS(bonobo.get_examples_path('datasets', 'static', 'spam.tgz'))
|
||||
}
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = examples.get_argument_parser()
|
||||
with bonobo.parse_args(parser) as options:
|
||||
bonobo.run(
|
||||
get_graph(**examples.get_graph_options(options)),
|
||||
services=get_services()
|
||||
)
|
||||
sys.exit(examples.run(get_graph, get_services))
|
||||
|
||||
8
bonobo/examples/files/services.py
Normal file
8
bonobo/examples/files/services.py
Normal file
@ -0,0 +1,8 @@
|
||||
from bonobo import open_fs, examples
|
||||
|
||||
|
||||
def get_services():
|
||||
return {
|
||||
**examples.get_services(),
|
||||
'fs.output': open_fs(),
|
||||
}
|
||||
@ -1,6 +1,8 @@
|
||||
import sys
|
||||
|
||||
import bonobo
|
||||
from bonobo import examples
|
||||
from bonobo.examples.files._services import get_services
|
||||
from bonobo.examples.files.services import get_services
|
||||
|
||||
|
||||
def skip_comments(line):
|
||||
@ -11,7 +13,7 @@ def skip_comments(line):
|
||||
|
||||
def get_graph(*, _limit=(), _print=()):
|
||||
return bonobo.Graph(
|
||||
bonobo.FileReader('datasets/passwd.txt'),
|
||||
bonobo.FileReader('passwd.txt', fs='fs.static'),
|
||||
skip_comments,
|
||||
*_limit,
|
||||
lambda s: s.split(':')[0],
|
||||
@ -21,9 +23,4 @@ def get_graph(*, _limit=(), _print=()):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = examples.get_argument_parser()
|
||||
with bonobo.parse_args(parser) as options:
|
||||
bonobo.run(
|
||||
get_graph(**examples.get_graph_options(options)),
|
||||
services=get_services()
|
||||
)
|
||||
sys.exit(examples.run(get_graph, get_services))
|
||||
|
||||
@ -1,5 +0,0 @@
|
||||
from bonobo import open_examples_fs
|
||||
|
||||
|
||||
def get_services():
|
||||
return {'fs': open_examples_fs('datasets')}
|
||||
@ -1,23 +0,0 @@
|
||||
import bonobo
|
||||
|
||||
|
||||
def extract():
|
||||
yield 'foo'
|
||||
yield 'bar'
|
||||
yield 'baz'
|
||||
|
||||
|
||||
def transform(x):
|
||||
return x.upper()
|
||||
|
||||
|
||||
def load(x):
|
||||
print(x)
|
||||
|
||||
|
||||
graph = bonobo.Graph(extract, transform, load)
|
||||
|
||||
graph.__doc__ = 'hello'
|
||||
|
||||
if __name__ == '__main__':
|
||||
bonobo.run(graph)
|
||||
@ -1,14 +0,0 @@
|
||||
import bonobo
|
||||
|
||||
graph = bonobo.Graph(
|
||||
[
|
||||
'foo',
|
||||
'bar',
|
||||
'baz',
|
||||
],
|
||||
str.upper,
|
||||
print,
|
||||
)
|
||||
|
||||
if __name__ == '__main__':
|
||||
bonobo.run(graph)
|
||||
@ -1,14 +0,0 @@
|
||||
import bonobo
|
||||
|
||||
graph = bonobo.Graph(
|
||||
bonobo.FileReader('coffeeshops.txt'),
|
||||
print,
|
||||
)
|
||||
|
||||
|
||||
def get_services():
|
||||
return {'fs': bonobo.open_examples_fs('datasets')}
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
bonobo.run(graph, services=get_services())
|
||||
@ -1,23 +0,0 @@
|
||||
import bonobo
|
||||
|
||||
|
||||
def split_one(line):
|
||||
return dict(zip(("name", "address"), line.split(', ', 1)))
|
||||
|
||||
|
||||
graph = bonobo.Graph(
|
||||
bonobo.FileReader('coffeeshops.txt'),
|
||||
split_one,
|
||||
bonobo.JsonWriter('coffeeshops.json', fs='fs.output'),
|
||||
)
|
||||
|
||||
|
||||
def get_services():
|
||||
return {
|
||||
'fs': bonobo.open_examples_fs('datasets'),
|
||||
'fs.output': bonobo.open_fs(),
|
||||
}
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
bonobo.run(graph, services=get_services())
|
||||
@ -1,36 +0,0 @@
|
||||
import json
|
||||
|
||||
import bonobo
|
||||
|
||||
|
||||
def split_one_to_map(line):
|
||||
k, v = line.split(', ', 1)
|
||||
return {k: v}
|
||||
|
||||
|
||||
class MyJsonWriter(bonobo.JsonWriter):
|
||||
prefix, suffix = '{', '}'
|
||||
|
||||
def write(self, fs, file, lineno, **row):
|
||||
return bonobo.FileWriter.write(
|
||||
self, fs, file, lineno,
|
||||
json.dumps(row)[1:-1]
|
||||
)
|
||||
|
||||
|
||||
graph = bonobo.Graph(
|
||||
bonobo.FileReader('coffeeshops.txt'),
|
||||
split_one_to_map,
|
||||
MyJsonWriter('coffeeshops.json', fs='fs.output'),
|
||||
)
|
||||
|
||||
|
||||
def get_services():
|
||||
return {
|
||||
'fs': bonobo.open_examples_fs('datasets'),
|
||||
'fs.output': bonobo.open_fs(),
|
||||
}
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
bonobo.run(graph, services=get_services())
|
||||
@ -1,25 +0,0 @@
|
||||
import bonobo
|
||||
|
||||
|
||||
def generate_data():
|
||||
yield 'foo'
|
||||
yield 'bar'
|
||||
yield 'baz'
|
||||
|
||||
|
||||
def uppercase(x: str):
|
||||
return x.upper()
|
||||
|
||||
|
||||
def output(x: str):
|
||||
print(x)
|
||||
|
||||
|
||||
graph = bonobo.Graph(
|
||||
generate_data,
|
||||
uppercase,
|
||||
output,
|
||||
)
|
||||
|
||||
if __name__ == '__main__':
|
||||
bonobo.run(graph)
|
||||
@ -1,11 +0,0 @@
|
||||
import bonobo
|
||||
|
||||
# Represent our data processor as a simple directed graph of callables.
|
||||
graph = bonobo.Graph(
|
||||
['foo', 'bar', 'baz'],
|
||||
str.upper,
|
||||
print,
|
||||
)
|
||||
|
||||
if __name__ == '__main__':
|
||||
bonobo.run(graph)
|
||||
@ -40,7 +40,7 @@ def create_strategy(name=None):
|
||||
if name is None:
|
||||
name = DEFAULT_STRATEGY
|
||||
|
||||
logging.debug('Creating strategy {}...'.format(name))
|
||||
logging.debug('Creating execution strategy {!r}...'.format(name))
|
||||
|
||||
try:
|
||||
factory = STRATEGIES[name]
|
||||
|
||||
@ -12,6 +12,8 @@ class Strategy:
|
||||
self.GraphExecutionContextType = GraphExecutionContextType or self.GraphExecutionContextType
|
||||
|
||||
def create_graph_execution_context(self, graph, *args, GraphExecutionContextType=None, **kwargs):
|
||||
if not len(graph):
|
||||
raise ValueError('You provided an empty graph, which does not really make sense. Please add some nodes.')
|
||||
return (GraphExecutionContextType or self.GraphExecutionContextType)(graph, *args, **kwargs)
|
||||
|
||||
def execute(self, graph, *args, **kwargs):
|
||||
|
||||
68
bonobo/util/errors.py
Normal file
68
bonobo/util/errors.py
Normal file
@ -0,0 +1,68 @@
|
||||
import logging
|
||||
import re
|
||||
from contextlib import contextmanager
|
||||
from sys import exc_info
|
||||
|
||||
from mondrian import term
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def sweeten_errors():
|
||||
try:
|
||||
yield
|
||||
except Exception as exc:
|
||||
SPACES = 2
|
||||
w = term.white
|
||||
prefix = w('║' + ' ' * (SPACES - 1))
|
||||
suffix = w(' ' * (SPACES - 1) + '║')
|
||||
|
||||
pre_re = re.compile('([^`]*)`([^`]*)`([^`]*)')
|
||||
|
||||
def format_arg(arg):
|
||||
length = len(pre_re.sub('\\1\\2\\3', arg))
|
||||
|
||||
arg = pre_re.sub(w('\\1') + term.bold('\\2') + w('\\3'), arg)
|
||||
arg = re.sub('^ \$ (.*)', term.lightblack(' $ ') + term.reset('\\1'), arg)
|
||||
|
||||
return (arg, length)
|
||||
|
||||
def f(*args):
|
||||
return ''.join(args)
|
||||
|
||||
term_width, term_height = term.get_size()
|
||||
line_length = min(80, term_width)
|
||||
for arg in exc.args:
|
||||
line_length = max(min(line_length, len(arg) + 2 * SPACES), 120)
|
||||
|
||||
print(f(w('╔' + '═' * (line_length - 2) + '╗')))
|
||||
for i, arg in enumerate(exc.args):
|
||||
|
||||
if i == 1:
|
||||
print(f(
|
||||
prefix,
|
||||
' ' * (line_length - 2 * SPACES),
|
||||
suffix,
|
||||
))
|
||||
|
||||
arg_formatted, arg_length = format_arg(arg)
|
||||
if not i:
|
||||
# first line
|
||||
print(
|
||||
f(
|
||||
prefix,
|
||||
term.red_bg(term.bold(' ' + type(exc).__name__ + ' ')),
|
||||
' ',
|
||||
w(arg_formatted),
|
||||
' ' * (line_length - (arg_length + 3 + len(type(exc).__name__) + 2 * SPACES)),
|
||||
suffix,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# other lines
|
||||
print(f(prefix, arg_formatted + ' ' * (line_length - arg_length - 2 * SPACES), suffix))
|
||||
|
||||
print(f(w('╚' + '═' * (line_length - 2) + '╝')))
|
||||
|
||||
logging.getLogger().debug('This error was caused by the following exception chain.', exc_info=exc_info())
|
||||
Reference in New Issue
Block a user