starting to write docs, taking decisions on public api

This commit is contained in:
Romain Dorgueil
2016-12-27 13:31:38 +01:00
parent 512e2ab46d
commit 25ad284935
29 changed files with 604 additions and 96 deletions

View File

@ -36,16 +36,20 @@ with open(os.path.realpath(os.path.join(os.path.dirname(__file__), '../version.t
__all__ = [
'Bag',
'FileWriter',
'Graph',
'NaiveStrategy',
'JsonFileWriter',
'NOT_MODIFIED',
'NaiveStrategy',
'ProcessPoolExecutorStrategy',
'ThreadPoolExecutorStrategy',
'console_run',
'head',
'inject',
'jupyter_run',
'log',
'noop',
'run',
'service',
'tee',
'to_json',
]

View File

9
bonobo/compat/pandas.py Normal file
View File

@ -0,0 +1,9 @@
from bonobo import FileWriter, JsonFileWriter
to_file = FileWriter
to_json = JsonFileWriter
__all__ = [
'to_json',
'to_file',
]

View File

@ -33,8 +33,8 @@ class Bag:
def flags(self):
return self._flags
def apply(self, func, *args, **kwargs):
return func(*args, *self.args, **kwargs, **self.kwargs)
def apply(self, func_or_iter, *args, **kwargs):
return func_or_iter(*args, *self.args, **kwargs, **self.kwargs)
def extend(self, *args, **kwargs):
return type(self)(*args, _parent=self, **kwargs)

View File

@ -128,7 +128,7 @@ class ComponentExecutionContext(WithStatistics, AbstractLoopContext):
@property
def name(self):
return self.component.__name__
return getattr(self.component, '__name__', getattr(type(self.component), '__name__', repr(self.component)))
def __init__(self, component, parent):
self.parent = parent

View File

@ -6,9 +6,10 @@ class Graph:
Represents a coherent directed acyclic graph (DAG) of components.
"""
def __init__(self):
def __init__(self, *chain):
self.components = []
self.graph = {BEGIN: set()}
self.add_chain(*chain)
def outputs_of(self, idx, create=False):
if create and not idx in self.graph:

View File

@ -1,7 +1,3 @@
from .helpers import console_run
from .plugin import ConsoleOutputPlugin
__all__ = [
'ConsoleOutputPlugin',
'console_run',
]
__all__ = ['ConsoleOutputPlugin', ]

View File

@ -1,9 +0,0 @@
from bonobo import Graph, ThreadPoolExecutorStrategy
from .plugin import ConsoleOutputPlugin
def console_run(*chain, output=True, plugins=None):
graph = Graph()
executor = ThreadPoolExecutorStrategy()
graph.add_chain(*chain)
return executor.execute(graph, plugins=(plugins or []) + [ConsoleOutputPlugin()] if output else [])

View File

@ -1,4 +1,4 @@
from .helpers import jupyter_run
from bonobo.util.helpers import jupyter_run
from .plugin import JupyterOutputPlugin
@ -6,7 +6,4 @@ def _jupyter_nbextension_paths():
return [{'section': 'notebook', 'src': 'static', 'dest': 'bonobo-jupyter', 'require': 'bonobo-jupyter/extension'}]
__all__ = [
'JupyterOutputPlugin',
'jupyter_run',
]
__all__ = ['JupyterOutputPlugin', ]

View File

@ -1,9 +1 @@
from bonobo import Graph, ThreadPoolExecutorStrategy
from .plugin import JupyterOutputPlugin
def jupyter_run(*chain, plugins=None):
graph = Graph()
executor = ThreadPoolExecutorStrategy()
graph.add_chain(*chain)
return executor.execute(graph, plugins=(plugins or []) + [JupyterOutputPlugin()])

View File

@ -3,11 +3,18 @@ from urllib.parse import urlencode
import requests # todo: make this a service so we can substitute it ?
def extract_ods(url, dataset, rows=100, **kwargs):
def from_opendatasoft_api(dataset=None,
endpoint='{scheme}://{netloc}{path}',
scheme='https',
netloc='data.opendatasoft.com',
path='/api/records/1.0/search/',
rows=100,
**kwargs):
path = path if path.startswith('/') else '/' + path
params = (
('dataset', dataset),
('rows', rows), ) + tuple(sorted(kwargs.items()))
base_url = url + '?' + urlencode(params)
base_url = endpoint.format(scheme=scheme, netloc=netloc, path=path) + '?' + urlencode(params)
def _extract_ods():
nonlocal base_url, rows

View File

@ -1,5 +1,9 @@
""" Readers and writers for common file formats. """
from .json import *
from .file import FileWriter
from .json import JsonFileWriter
__all__ = ['to_json', ]
__all__ = [
'FileWriter',
'JsonFileWriter',
]

35
bonobo/io/file.py Normal file
View File

@ -0,0 +1,35 @@
from bonobo.util.lifecycle import with_context
__all__ = ['FileWriter', ]
@with_context
class FileWriter:
# XXX TODO implement @with_context like this ? Pros and cons ?
class Meta:
contextual = True
def __init__(self, path_or_buf, eol='\n'):
self.path_or_buf = path_or_buf
self.eol = eol
def initialize(self, ctx):
""" todo add lock file ? optional maybe ? """
assert not hasattr(ctx, 'fp'), 'One at a time, baby.'
ctx.fp = open(self.path_or_buf, 'w+')
ctx.first = True
def write(self, fp, line, prefix=''):
fp.write(prefix + line)
def __call__(self, ctx, row):
if ctx.first:
prefix, ctx.first = '', False
else:
prefix = self.eol
self.write(ctx.fp, row, prefix=prefix)
def finalize(self, ctx):
ctx.fp.close()
del ctx.fp, ctx.first

View File

@ -1,40 +1,23 @@
import json
from .file import FileWriter
from bonobo.util.lifecycle import with_context
__all__ = [
'from_json',
'to_json',
]
__all__ = ['JsonFileWriter', ]
@with_context
class JsonWriter:
class JsonFileWriter(FileWriter):
def __init__(self, path_or_buf):
self.path_or_buf = path_or_buf
super().__init__(path_or_buf, eol=',\n')
def initialize(self, ctx):
assert not hasattr(ctx, 'fp'), 'One at a time, baby.'
ctx.fp = open(self.path_or_buf, 'w+')
super().initialize(ctx)
ctx.fp.write('[\n')
ctx.first = True
def __call__(self, ctx, row):
if ctx.first:
prefix = ''
ctx.first = False
else:
prefix = ',\n'
ctx.fp.write(prefix + json.dumps(row))
def write(self, fp, line, prefix=''):
fp.write(prefix + json.dumps(line))
def finalize(self, ctx):
ctx.fp.write('\n]')
ctx.fp.close()
del ctx.fp, ctx.first
def from_json(path_or_buf):
pass
to_json = JsonWriter
super().finalize(ctx)

View File

@ -4,12 +4,16 @@ import functools
import pprint
from .tokens import NOT_MODIFIED
from .helpers import run, console_run, jupyter_run
__all__ = [
'NOT_MODIFIED',
'console_run',
'head',
'jupyter_run',
'log',
'noop',
'run',
'tee',
]

20
bonobo/util/helpers.py Normal file
View File

@ -0,0 +1,20 @@
def run(*chain, plugins=None):
from bonobo import Graph, ThreadPoolExecutorStrategy
graph = Graph()
graph.add_chain(*chain)
executor = ThreadPoolExecutorStrategy()
return executor.execute(graph, plugins=plugins or [])
def console_run(*chain, output=True, plugins=None):
from bonobo.ext.console import ConsoleOutputPlugin
return run(*chain, plugins=(plugins or []) + [ConsoleOutputPlugin()] if output else [])
def jupyter_run(*chain, plugins=None):
from bonobo.ext.jupyter import JupyterOutputPlugin
return run(*chain, plugins=(plugins or []) + [JupyterOutputPlugin()])