From 650b49a41a41012b583fc0188dc911645195a25d Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Sun, 22 Oct 2017 09:24:34 +0200 Subject: [PATCH] [django, google] Implements basic extensions for django and google oauth systems. Using those extensions means you have the correct dependencies installed, and that you know about the external system. Django: just provide an ETLCommand class that contains all the shortcuts to write django management commands based on Bonobo. Google: shortcuts to create the necessary objects for oauth flow, with local caching of credentials. Both those extensions are not stable and will evolve. --- bonobo/events.py | 3 +++ bonobo/execution/base.py | 12 +-------- bonobo/execution/graph.py | 9 +++++++ bonobo/execution/plugin.py | 6 ++--- bonobo/ext/console.py | 45 ++++++++++++++++++++++----------- bonobo/ext/django.py | 47 +++++++++++++++++++++++++++++++++++ bonobo/ext/google.py | 43 ++++++++++++++++++++++++++++++++ bonobo/plugins.py | 28 +-------------------- bonobo/strategies/executor.py | 1 + docs/conf.py | 5 ++-- 10 files changed, 141 insertions(+), 58 deletions(-) create mode 100644 bonobo/events.py create mode 100644 bonobo/ext/django.py create mode 100644 bonobo/ext/google.py diff --git a/bonobo/events.py b/bonobo/events.py new file mode 100644 index 0000000..9a0cbba --- /dev/null +++ b/bonobo/events.py @@ -0,0 +1,3 @@ +ON_START = 'bonobo.on_start' +ON_TICK = 'bonobo.on_tick' +ON_STOP = 'bonobo.on_stop' diff --git a/bonobo/execution/base.py b/bonobo/execution/base.py index 81ac74e..b9bce36 100644 --- a/bonobo/execution/base.py +++ b/bonobo/execution/base.py @@ -4,8 +4,7 @@ from time import sleep from bonobo.config import create_container from bonobo.config.processors import ContextCurrifier -from bonobo.plugins import get_enhancers -from bonobo.util import inspect_node, isconfigurabletype +from bonobo.util import isconfigurabletype from bonobo.util.errors import print_error from bonobo.util.objects import Wrapper, get_name @@ -56,9 +55,6 @@ class LoopingExecutionContext(Wrapper): self._started, self._stopped = False, False self._stack = None - # XXX enhancers - self._enhancers = get_enhancers(self.wrapped) - def __enter__(self): self.start() return self @@ -79,15 +75,9 @@ class LoopingExecutionContext(Wrapper): raise TypeError( 'The Configurable should be fully instanciated by now, unfortunately I got a PartiallyConfigured object...' ) - # XXX enhance that, maybe giving hints on what's missing. - # print(inspect_node(self.wrapped)) self._stack.setup(self) - for enhancer in self._enhancers: - with unrecoverable(self.handle_error): - enhancer.start(self) - def loop(self): """Generic loop. A bit boring. """ while self.alive: diff --git a/bonobo/execution/graph.py b/bonobo/execution/graph.py index 77e01fa..33e77bc 100644 --- a/bonobo/execution/graph.py +++ b/bonobo/execution/graph.py @@ -5,6 +5,7 @@ from bonobo.config import create_container from bonobo.constants import BEGIN, END from bonobo.execution.node import NodeExecutionContext from bonobo.execution.plugin import PluginExecutionContext +from whistle import EventDispatcher class GraphExecutionContext: @@ -23,6 +24,14 @@ class GraphExecutionContext: def alive(self): return any(node.alive for node in self.nodes) + @property + def dispatcher(self): + try: + return self._dispatcher + except AttributeError: + self._dispatcher = EventDispatcher() + return self._dispatcher + def __init__(self, graph, plugins=None, services=None): self.graph = graph self.nodes = [self.create_node_execution_context_for(node) for node in self.graph] diff --git a/bonobo/execution/plugin.py b/bonobo/execution/plugin.py index a207f23..3379fc0 100644 --- a/bonobo/execution/plugin.py +++ b/bonobo/execution/plugin.py @@ -13,14 +13,14 @@ class PluginExecutionContext(LoopingExecutionContext): super().start() with recoverable(self.handle_error): - self.wrapped.initialize() + self.wrapped.on_start() def shutdown(self): if self.started: with recoverable(self.handle_error): - self.wrapped.finalize() + self.wrapped.on_stop() self.alive = False def step(self): with recoverable(self.handle_error): - self.wrapped.run() + self.wrapped.on_tick() diff --git a/bonobo/ext/console.py b/bonobo/ext/console.py index 1053e04..02bcafa 100644 --- a/bonobo/ext/console.py +++ b/bonobo/ext/console.py @@ -1,6 +1,6 @@ import io import sys -from contextlib import redirect_stdout +from contextlib import redirect_stdout, redirect_stderr from colorama import Style, Fore, init @@ -50,35 +50,50 @@ class ConsoleOutputPlugin(Plugin): """ - def initialize(self): + # Standard outputs descriptors backup here, also used to override if needed. + _stdout = sys.stdout + _stderr = sys.stderr + + # When the plugin is started, we'll set the real value of this. + isatty = False + + # Whether we're on windows, or a real operating system. + iswindows = (sys.platform == 'win32') + + def on_start(self): self.prefix = '' self.counter = 0 self._append_cache = '' - self.isatty = sys.stdout.isatty() - self.iswindows = (sys.platform == 'win32') - self._stdout = sys.stdout + self.isatty = self._stdout.isatty() + self.stdout = IOBuffer() self.redirect_stdout = redirect_stdout(self._stdout if self.iswindows else self.stdout) self.redirect_stdout.__enter__() - def run(self): + self.stderr = IOBuffer() + self.redirect_stderr = redirect_stderr(self._stderr if self.iswindows else self.stderr) + self.redirect_stderr.__enter__() + + def on_tick(self): if self.isatty and not self.iswindows: self._write(self.context.parent, rewind=True) else: pass # not a tty, or windows, so we'll ignore stats output - def finalize(self): + def on_stop(self): self._write(self.context.parent, rewind=False) + self.redirect_stderr.__exit__(None, None, None) self.redirect_stdout.__exit__(None, None, None) def write(self, context, prefix='', rewind=True, append=None): t_cnt = len(context) if not self.iswindows: - buffered = self.stdout.switch() - for line in buffered.split('\n')[:-1]: - print(line + CLEAR_EOL, file=sys.stderr) + for line in self.stdout.switch().split('\n')[:-1]: + print(line + CLEAR_EOL, file=self._stdout) + for line in self.stderr.switch().split('\n')[:-1]: + print(line + CLEAR_EOL, file=self._stderr) alive_color = Style.BRIGHT dead_color = Style.BRIGHT + Fore.BLACK @@ -117,7 +132,7 @@ class ConsoleOutputPlugin(Plugin): ' ', ) ) - print(prefix + _line + '\033[0K', file=sys.stderr) + print(prefix + _line + CLEAR_EOL, file=self._stderr) if append: # todo handle multiline @@ -128,13 +143,13 @@ class ConsoleOutputPlugin(Plugin): CLEAR_EOL ) ), - file=sys.stderr + file=self._stderr ) t_cnt += 1 if rewind: - print(CLEAR_EOL, file=sys.stderr) - print(MOVE_CURSOR_UP(t_cnt + 2), file=sys.stderr) + print(CLEAR_EOL, file=self._stderr) + print(MOVE_CURSOR_UP(t_cnt + 2), file=self._stderr) def _write(self, graph_context, rewind): if settings.PROFILE.get(): @@ -154,4 +169,4 @@ class ConsoleOutputPlugin(Plugin): def memory_usage(): import os, psutil process = psutil.Process(os.getpid()) - return process.memory_info()[0] / float(2**20) + return process.memory_info()[0] / float(2 ** 20) diff --git a/bonobo/ext/django.py b/bonobo/ext/django.py new file mode 100644 index 0000000..1bd3fff --- /dev/null +++ b/bonobo/ext/django.py @@ -0,0 +1,47 @@ +from colorama import Fore, Back, Style +from django.core.management.base import BaseCommand, OutputWrapper +from logging import getLogger + +import bonobo +import bonobo.util +from bonobo.commands.run import get_default_services +from bonobo.ext.console import ConsoleOutputPlugin +from bonobo.util.term import CLEAR_EOL + +class ETLCommand(BaseCommand): + GraphType = bonobo.Graph + + def get_graph(self, *args, **options): + def not_implemented(): + raise NotImplementedError('You must implement {}.get_graph() method.'.format(self)) + + return self.GraphType(not_implemented) + + def get_services(self): + return get_default_services(type(self).__file__) + + @property + def logger(self): + try: + return self._logger + except AttributeError: + self._logger = getLogger(type(self).__module__) + return self._logger + + def info(self, *args, **kwargs): + self.logger.info(*args, **kwargs) + + def handle(self, *args, **options): + _stdout_backup, _stderr_backup = self.stdout, self.stderr + + self.stdout = OutputWrapper(ConsoleOutputPlugin._stdout, ending=CLEAR_EOL + '\n') + self.stderr = OutputWrapper(ConsoleOutputPlugin._stderr, ending=CLEAR_EOL + '\n') + self.stderr.style_func = lambda x: Fore.LIGHTRED_EX + Back.RED + '!' + Style.RESET_ALL + ' ' + x + + result = bonobo.run( + self.get_graph(*args, **options), + services=self.get_services(), + ) + self.stdout = _stdout_backup + + return '\nReturn Value: ' + str(result) diff --git a/bonobo/ext/google.py b/bonobo/ext/google.py new file mode 100644 index 0000000..920af9d --- /dev/null +++ b/bonobo/ext/google.py @@ -0,0 +1,43 @@ +import os + +import httplib2 +from apiclient import discovery +from oauth2client import client, tools +from oauth2client.file import Storage +from oauth2client.tools import argparser + +HOME_DIR = os.path.expanduser('~') +GOOGLE_SCOPES = ('https://www.googleapis.com/auth/spreadsheets',) +GOOGLE_SECRETS = os.path.join(HOME_DIR, '.cache/secrets/client_secrets.json') + + +def get_credentials(): + """Gets valid user credentials from storage. + + If nothing has been stored, or if the stored credentials are invalid, + the OAuth2 flow is completed to obtain the new credentials. + + Returns: + Credentials, the obtained credential. + """ + credential_dir = os.path.join(HOME_DIR, '.cache', __package__, 'credentials') + if not os.path.exists(credential_dir): + os.makedirs(credential_dir) + credential_path = os.path.join(credential_dir, 'googleapis.json') + + store = Storage(credential_path) + credentials = store.get() + if not credentials or credentials.invalid: + flow = client.flow_from_clientsecrets(GOOGLE_SECRETS, GOOGLE_SCOPES) + flow.user_agent = 'Bonobo ETL (https://www.bonobo-project.org/)' + flags = argparser.parse_args(['--noauth_local_webserver']) + credentials = tools.run_flow(flow, store, flags) + print('Storing credentials to ' + credential_path) + return credentials + + +def get_google_spreadsheets_api_client(): + credentials = get_credentials() + http = credentials.authorize(httplib2.Http()) + discoveryUrl = 'https://sheets.googleapis.com/$discovery/rest?version=v4' + return discovery.build('sheets', 'v4', http=http, discoveryServiceUrl=discoveryUrl, cache_discovery=False) diff --git a/bonobo/plugins.py b/bonobo/plugins.py index 4fa1e18..7a0f5d1 100644 --- a/bonobo/plugins.py +++ b/bonobo/plugins.py @@ -1,7 +1,3 @@ -from bonobo.config import Configurable -from bonobo.util.objects import get_attribute_or_create - - class Plugin: """ A plugin is an extension to the core behavior of bonobo. If you're writing transformations, you should not need @@ -11,30 +7,8 @@ class Plugin: respectively permits an interactive output on an ANSI console and a rich output in a jupyter notebook. Warning: THE PLUGIN API IS PRE-ALPHA AND WILL EVOLVE BEFORE 1.0, DO NOT RELY ON IT BEING STABLE! - + """ def __init__(self, context): self.context = context - - def initialize(self): - pass - - def run(self): - pass - - def finalize(self): - pass - - -def get_enhancers(obj): - try: - return get_attribute_or_create(obj, '__enhancers__', list()) - except AttributeError: - return list() - - -class NodeEnhancer(Configurable): - def __matmul__(self, other): - get_enhancers(other).append(self) - return other diff --git a/bonobo/strategies/executor.py b/bonobo/strategies/executor.py index 3bfabc6..57e2524 100644 --- a/bonobo/strategies/executor.py +++ b/bonobo/strategies/executor.py @@ -6,6 +6,7 @@ from bonobo.constants import BEGIN, END from bonobo.strategies.base import Strategy from bonobo.structs.bags import Bag from bonobo.util.errors import print_error +from whistle import EventDispatcher class ExecutorStrategy(Strategy): diff --git a/docs/conf.py b/docs/conf.py index afbbe83..93895a8 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,8 +1,9 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import sys +import datetime import os +import sys sys.path.insert(0, os.path.abspath('..')) sys.path.insert(0, os.path.abspath('_themes')) @@ -36,8 +37,8 @@ master_doc = 'index' # General information about the project. project = 'Bonobo' -copyright = '2012-2017, Romain Dorgueil' author = 'Romain Dorgueil' +copyright = '2012-{}, {}'.format(datetime.datetime.now().year, author) # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the