Merge remote-tracking branch 'upstream/master' into develop

This commit is contained in:
Romain Dorgueil
2018-07-28 16:00:45 +01:00
12 changed files with 143 additions and 39 deletions

View File

@ -1,4 +1,4 @@
# Generated by Medikit 0.6.3 on 2018-07-22.
# Generated by Medikit 0.6.3 on 2018-07-28.
# All changes will be overriden.
# Edit Projectfile and run “make update” (or “medikit update”) to regenerate.

View File

@ -57,7 +57,7 @@ install:
# Upgrade to the latest version of pip to avoid it displaying warnings
# about it being out of date.
# - "pip install --disable-pip-version-check --user --upgrade pip"
- "python -m pip install --disable-pip-version-check --user --upgrade pip"
# Install the build dependencies of the project. If some dependencies contain
# compiled extensions and are not provided as pre-built wheel packages,

View File

@ -41,18 +41,17 @@ class ETLCommand(BaseCommand):
def get_services(self):
return {}
def get_strategy(self):
return None
def info(self, *args, **kwargs):
self.logger.info(*args, **kwargs)
def handle(self, *args, **options):
_stdout_backup, _stderr_backup = self.stdout, self.stderr
self.stdout = OutputWrapper(ConsoleOutputPlugin._stdout, ending=CLEAR_EOL + '\n')
self.stderr = OutputWrapper(ConsoleOutputPlugin._stderr, ending=CLEAR_EOL + '\n')
self.stderr.style_func = lambda x: Fore.LIGHTRED_EX + Back.RED + '!' + Style.RESET_ALL + ' ' + x
def run(self, *args, **options):
results = []
with bonobo.parse_args(options) as options:
services = self.get_services()
strategy = self.get_strategy()
graph_coll = self.get_graph(*args, **options)
if not isinstance(graph_coll, GeneratorType):
@ -61,8 +60,20 @@ class ETLCommand(BaseCommand):
for i, graph in enumerate(graph_coll):
assert isinstance(graph, bonobo.Graph), 'Invalid graph provided.'
print(term.lightwhite('{}. {}'.format(i + 1, graph.name)))
result = bonobo.run(graph, services=services)
result = bonobo.run(graph, services=services, strategy=strategy)
results.append(result)
print(term.lightblack(' ... return value: ' + str(result)))
print()
return results
def handle(self, *args, **options):
_stdout_backup, _stderr_backup = self.stdout, self.stderr
self.stdout = OutputWrapper(ConsoleOutputPlugin._stdout, ending=CLEAR_EOL + '\n')
self.stderr = OutputWrapper(ConsoleOutputPlugin._stderr, ending=CLEAR_EOL + '\n')
self.stderr.style_func = lambda x: Fore.LIGHTRED_EX + Back.RED + '!' + Style.RESET_ALL + ' ' + x
self.run(*args, **kwargs)
self.stdout, self.stderr = _stdout_backup, _stderr_backup

View File

@ -36,6 +36,25 @@ The `sqlalchemy.engine` name is the default name used by the provided transforma
example if you need more than one connection) and specify the service name using `engine='myengine'` while building your
transformations.
Lets create some tables and add some data. (You may need to edit the SQL if your database server uses a different
version of SQL.)
.. code-block:: sql
CREATE TABLE test_in (
id INTEGER PRIMARY KEY NOT NULL,
text TEXT
);
CREATE TABLE test_out (
id INTEGER PRIMARY KEY NOT NULL,
text TEXT
);
INSERT INTO test_in (id, text) VALUES (1, 'Cat');
INSERT INTO test_in (id, text) VALUES (2, 'Dog');
There are two transformation classes provided by this extension.
One reader, one writer.
@ -50,12 +69,29 @@ Let's select some data:
def get_graph():
graph = bonobo.Graph()
graph.add_chain(
bonobo_sqlalchemy.Select('SELECT * FROM example', limit=100),
bonobo_sqlalchemy.Select('SELECT * FROM test_in', limit=100),
bonobo.PrettyPrinter(),
)
return graph
And let's insert some data:
You should see:
.. code-block:: shell-session
$ python tutorial.py
│ id[0] = 1
│ text[1] = 'Cat'
│ id[0] = 2
│ text[1] = 'Dog'
- Select in=1 out=2 [done]
- PrettyPrinter in=2 out=2 [done]
Now let's insert some data:
.. code-block:: python
@ -66,12 +102,14 @@ And let's insert some data:
def get_graph(**options):
graph = bonobo.Graph()
graph.add_chain(
...,
bonobo_sqlalchemy.InsertOrUpdate('example')
bonobo_sqlalchemy.Select('SELECT * FROM test_in', limit=100),
bonobo_sqlalchemy.InsertOrUpdate('test_out')
)
return graph
If you check the `test_out` table, it should now have the data.
Reference
:::::::::

View File

@ -40,7 +40,7 @@ This will create a simple job in a `tutorial.py` file. Let's run it:
- transform in=2 out=2 [done]
- load in=2 [done]
If you have a similar result, then congratulations! You just ran your first |bonobo| ETL job.
Congratulations! You just ran your first |bonobo| ETL job.
Inspect your graph
@ -53,7 +53,12 @@ The basic building blocks of |bonobo| are **transformations** and **graphs**.
**Graphs** are a set of transformations, with directional links between them to define the data-flow that will happen
at runtime.
To inspect the graph of your first transformation (you must install graphviz first to do so), run:
To inspect the graph of your first transformation:
.. note::
You must `install the graphviz software first <https://www.graphviz.org/download/>`_. It is _not_ the python's graphviz
package, you must install it using your system's package manager (apt, brew, ...).
.. code-block:: shell-session
@ -242,6 +247,37 @@ The console output contains two things.
a call, but the execution will move to the next row.
However, if you run the tutorial.py it happens too fast and you can't see the status change. Let's add some delays to your code.
At the top of tutorial.py add a new import and add some delays to the 3 stages:
.. code-block:: python
import time
def extract():
"""Placeholder, change, rename, remove... """
time.sleep(5)
yield 'hello'
time.sleep(5)
yield 'world'
def transform(*args):
"""Placeholder, change, rename, remove... """
time.sleep(5)
yield tuple(
map(str.title, args)
)
def load(*args):
"""Placeholder, change, rename, remove... """
time.sleep(5)
print(*args)
Now run tutorial.py again, and you can see the status change during the process.
Wrap up
:::::::

View File

@ -41,15 +41,30 @@ Now, we need to write a `writer` transformation, and apply this context processo
@use_context_processor(with_opened_file)
def write_repr_to_file(f, *row):
f.write(repr(row))
f.write(repr(row) + "\n")
The `f` parameter will contain the value yielded by the context processors, in order of appearance (you can chain
multiple context processors).
The `f` parameter will contain the value yielded by the context processors, in order of appearance. You can chain
multiple context processors. To find about how to implement this, check the |bonobo| guides in the documentation.
Please note that the :func:`bonobo.config.use_context_processor` decorator will modify the function in place, but won't
modify its behaviour. If you want to call it out of the |bonobo| job context, it's your responsibility to provide
the right parameters (and here, the opened file).
To run this, change the last stage in the pipeline in get_graph to write_repr_to_file
.. code-block:: python
def get_graph(**options):
graph = bonobo.Graph()
graph.add_chain(
extract_fablabs,
bonobo.Limit(10),
write_repr_to_file,
)
return graph
Now run tutorial.py and check the output.txt file.
Using the filesystem
::::::::::::::::::::
@ -65,10 +80,10 @@ Let's rewrite our context processor to use it.
with context.get_service('fs').open('output.txt', 'w+') as f:
yield f
Interface does not change much, but this small change allows the end-user to change the filesystem implementation at
runtime, which is great to handle different environments (local development, staging servers, production, ...).
The interface does not change much, but this small change allows the end-user to change the filesystem implementation at
runtime, which is great for handling different environments (local development, staging servers, production, ...).
Note that |bonobo| only provide very few services with default implementation (actually, only `fs` and `http`), but
Note that |bonobo| only provides very few services with default implementation (actually, only `fs` and `http`), but
you can define all the services you want, depending on your system. You'll learn more about this in the next tutorial
chapter.
@ -122,16 +137,17 @@ function:
Reading from files
::::::::::::::::::
Reading from files is done using the same logic as writing, except that you'll probably have only one call to a reader.
Reading from files is done using the same logic as writing, except that you'll probably have only one call to a reader. You can read the file we just wrote by using a :obj:`bonobo.CsvReader` instance:
Our example application does not include reading from files, but you can read the file we just wrote by using a
:obj:`bonobo.CsvReader` instance.
.. code-block:: python
Atomic writes
:::::::::::::
.. include:: _todo.rst
def get_graph(**options):
graph = bonobo.Graph()
graph.add_chain(
bonobo.CsvReader('output.csv'),
...
)
return graph
Moving forward

View File

@ -3,7 +3,7 @@ Part 4: Services
All external dependencies (like filesystems, network clients, database connections, etc.) should be provided to
transformations as a service. It allows great flexibility, including the ability to test your transformations isolated
from the external world, and being friendly to the infrastructure guys (and if you're one of them, it's also nice to
from the external world, and being friendly to the infrastructure people (and if you're one of them, it's also nice to
treat yourself well).
In the last section, we used the `fs` service to access filesystems, we'll go even further by switching our `requests`

View File

@ -26,7 +26,7 @@ py==1.5.4
pygments==2.2.0
pyparsing==2.2.0
pytest-cov==2.5.1
pytest-timeout==1.3.0
pytest-timeout==1.3.1
pytest==3.6.3
python-dateutil==2.7.3
pytz==2018.5

View File

@ -7,14 +7,14 @@ chardet==3.0.4
colorama==0.3.9
docker-pycreds==0.3.0
docker==2.7.0
fs==2.0.25
fs==2.0.26
graphviz==0.8.4
idna==2.7
jinja2==2.10
markupsafe==1.0
mondrian==0.7.0
packaging==17.1
pbr==4.1.1
pbr==4.2.0
psutil==5.4.6
pyparsing==2.2.0
python-slugify==1.2.5
@ -23,6 +23,7 @@ requests==2.19.1
semantic-version==2.6.0
six==1.11.0
stevedore==1.29.0
typing==3.6.4
unidecode==1.0.22
urllib3==1.23
websocket-client==0.48.0

View File

@ -5,14 +5,14 @@ bonobo-sqlalchemy==0.6.0
certifi==2018.4.16
chardet==3.0.4
colorama==0.3.9
fs==2.0.25
fs==2.0.26
graphviz==0.8.4
idna==2.7
jinja2==2.10
markupsafe==1.0
mondrian==0.7.0
packaging==17.1
pbr==4.1.1
pbr==4.2.0
psutil==5.4.6
pyparsing==2.2.0
python-slugify==1.2.5
@ -21,6 +21,7 @@ requests==2.19.1
six==1.11.0
sqlalchemy==1.2.10
stevedore==1.29.0
typing==3.6.4
unidecode==1.0.22
urllib3==1.23
whistle==1.0.1

View File

@ -3,14 +3,14 @@ appdirs==1.4.3
certifi==2018.4.16
chardet==3.0.4
colorama==0.3.9
fs==2.0.25
fs==2.0.26
graphviz==0.8.4
idna==2.7
jinja2==2.10
markupsafe==1.0
mondrian==0.7.0
packaging==17.1
pbr==4.1.1
pbr==4.2.0
psutil==5.4.6
pyparsing==2.2.0
python-slugify==1.2.5
@ -18,6 +18,7 @@ pytz==2018.5
requests==2.19.1
six==1.11.0
stevedore==1.29.0
typing==3.6.4
unidecode==1.0.22
urllib3==1.23
whistle==1.0.1

View File

@ -1,4 +1,4 @@
# Generated by Medikit 0.6.3 on 2018-07-22.
# Generated by Medikit 0.6.3 on 2018-07-28.
# All changes will be overriden.
# Edit Projectfile and run “make update” (or “medikit update”) to regenerate.