@ -2,7 +2,7 @@ import logging
|
||||
|
||||
from bonobo.structs import Bag, Graph, Token
|
||||
from bonobo.nodes import CsvReader, CsvWriter, FileReader, FileWriter, Filter, JsonReader, JsonWriter, Limit, \
|
||||
PrettyPrinter, PickleWriter, PickleReader, RateLimited, Tee, count, identity, noop
|
||||
PrettyPrinter, PickleWriter, PickleReader, RateLimited, Tee, count, identity, noop, arg0_to_kwargs, kwargs_to_arg0
|
||||
from bonobo.strategies import create_strategy
|
||||
from bonobo.util.objects import get_name
|
||||
|
||||
@ -108,13 +108,15 @@ register_api_group(
|
||||
JsonReader,
|
||||
JsonWriter,
|
||||
Limit,
|
||||
PrettyPrinter,
|
||||
PickleReader,
|
||||
PickleWriter,
|
||||
PrettyPrinter,
|
||||
RateLimited,
|
||||
Tee,
|
||||
arg0_to_kwargs,
|
||||
count,
|
||||
identity,
|
||||
kwargs_to_arg0,
|
||||
noop,
|
||||
)
|
||||
|
||||
|
||||
@ -10,11 +10,13 @@ from bonobo.util.objects import ValueHolder
|
||||
from bonobo.util.term import CLEAR_EOL
|
||||
|
||||
__all__ = [
|
||||
'identity',
|
||||
'Limit',
|
||||
'Tee',
|
||||
'count',
|
||||
'PrettyPrinter',
|
||||
'Tee',
|
||||
'arg0_to_kwargs',
|
||||
'count',
|
||||
'identity',
|
||||
'kwargs_to_arg0',
|
||||
'noop',
|
||||
]
|
||||
|
||||
@ -86,3 +88,25 @@ class PrettyPrinter(Configurable):
|
||||
def noop(*args, **kwargs): # pylint: disable=unused-argument
|
||||
from bonobo.constants import NOT_MODIFIED
|
||||
return NOT_MODIFIED
|
||||
|
||||
|
||||
def arg0_to_kwargs(row):
|
||||
"""
|
||||
Transform items in a stream from "arg0" format (each call only has one positional argument, which is a dict-like
|
||||
object) to "kwargs" format (each call only has keyword arguments that represent a row).
|
||||
|
||||
:param row:
|
||||
:return: bonobo.Bag
|
||||
"""
|
||||
return Bag(**row)
|
||||
|
||||
|
||||
def kwargs_to_arg0(**row):
|
||||
"""
|
||||
Transform items in a stream from "kwargs" format (each call only has keyword arguments that represent a row) to
|
||||
"arg0" format (each call only has one positional argument, which is a dict-like object) .
|
||||
|
||||
:param **row:
|
||||
:return: bonobo.Bag
|
||||
"""
|
||||
return Bag(row)
|
||||
|
||||
@ -59,7 +59,7 @@ available in **Bonobo**'s repository:
|
||||
|
||||
.. code-block:: shell-session
|
||||
|
||||
$ curl https://raw.githubusercontent.com/python-bonobo/bonobo/master/bonobo/examples/datasets/coffeeshops.txt > `python -c 'import bonobo; print(bonobo.get_examples_path("datasets/coffeeshops.txt"))'`
|
||||
$ curl https://raw.githubusercontent.com/python-bonobo/bonobo/master/bonobo/examples/datasets/coffeeshops.txt > `python3 -c 'import bonobo; print(bonobo.get_examples_path("datasets/coffeeshops.txt"))'`
|
||||
|
||||
.. note::
|
||||
|
||||
|
||||
@ -18,8 +18,8 @@ specialized packages, like SQLAlchemy, or other database access libraries from t
|
||||
First, read https://www.bonobo-project.org/with/sqlalchemy for instructions on how to install. You **do need** the
|
||||
bleeding edge version of `bonobo` and `bonobo-sqlalchemy` to make this work.
|
||||
|
||||
Additional requirements
|
||||
:::::::::::::::::::::::
|
||||
Requirements
|
||||
::::::::::::
|
||||
|
||||
Once you installed `bonobo_sqlalchemy` (read https://www.bonobo-project.org/with/sqlalchemy to use bleeding edge
|
||||
version), install the following additional packages:
|
||||
@ -39,17 +39,21 @@ Open your `_services.py` file and replace the code:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import bonobo
|
||||
import dotenv
|
||||
|
||||
import bonobo, dotenv, logging, os
|
||||
from bonobo_sqlalchemy.util import create_postgresql_engine
|
||||
|
||||
dotenv.load_dotenv(dotenv.find_dotenv())
|
||||
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
|
||||
|
||||
def get_services():
|
||||
return {
|
||||
'fs': bonobo.open_fs(),
|
||||
'db': create_postgresql_engine(name='tutorial')
|
||||
'fs': bonobo.open_examples_fs('datasets'),
|
||||
'fs.output': bonobo.open_fs(),
|
||||
'sqlalchemy.engine': create_postgresql_engine(**{
|
||||
'name': 'tutorial',
|
||||
'user': 'tutorial',
|
||||
'pass': 'tutorial',
|
||||
})
|
||||
}
|
||||
|
||||
The `create_postgresql_engine` is a tiny function building the DSN from reasonable defaults, that you can override
|
||||
@ -58,6 +62,9 @@ file and add values for one or more of `POSTGRES_NAME`, `POSTGRES_USER`, 'POSTGR
|
||||
`POSTGRES_PORT`. Please note that kwargs always have precedence on environment, but that you should prefer using
|
||||
environment variables for anything that is not immutable from one platform to another.
|
||||
|
||||
Add database operation to the graph
|
||||
:::::::::::::::::::::::::::::::::::
|
||||
|
||||
Let's create a `tutorial/pgdb.py` job:
|
||||
|
||||
.. code-block:: python
|
||||
@ -106,6 +113,9 @@ If we run this transformation (with `bonobo run tutorial/pgdb.py`), we should ge
|
||||
The database we requested do not exist. It is not the role of bonobo to do database administration, and thus there is
|
||||
no tool here to create neither the database, nor the tables we want to use.
|
||||
|
||||
Create database and table
|
||||
:::::::::::::::::::::::::
|
||||
|
||||
There are however tools in `sqlalchemy` to manage tables, so we'll create the database by ourselves, and ask sqlalchemy
|
||||
to create the table:
|
||||
|
||||
@ -125,15 +135,15 @@ Now, let's use a little trick and add this section to `pgdb.py`:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import logging, sys
|
||||
|
||||
from bonobo.commands.run import get_default_services
|
||||
import sys
|
||||
from sqlalchemy import Table, Column, String, Integer, MetaData
|
||||
|
||||
def main():
|
||||
from bonobo.commands.run import get_default_services
|
||||
services = get_default_services(__file__)
|
||||
|
||||
if len(sys.argv) == 2 and sys.argv[1] == 'reset':
|
||||
if len(sys.argv) == 1:
|
||||
return bonobo.run(graph, services=services)
|
||||
elif len(sys.argv) == 2 and sys.argv[1] == 'reset':
|
||||
engine = services.get('sqlalchemy.engine')
|
||||
metadata = MetaData()
|
||||
|
||||
@ -145,11 +155,10 @@ Now, let's use a little trick and add this section to `pgdb.py`:
|
||||
Column('address', String(255)),
|
||||
)
|
||||
|
||||
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
|
||||
metadata.drop_all(engine)
|
||||
metadata.create_all(engine)
|
||||
else:
|
||||
return bonobo.run(graph, services=services)
|
||||
raise NotImplementedError('I do not understand.')
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -167,6 +176,9 @@ Now run:
|
||||
|
||||
Database and table should now exist.
|
||||
|
||||
Format the data
|
||||
:::::::::::::::
|
||||
|
||||
Let's prepare our data for database, and change the `.add_chain(..)` call to do it prior to `InsertOrUpdate(...)`
|
||||
|
||||
.. code-block:: python
|
||||
@ -190,6 +202,9 @@ Let's prepare our data for database, and change the `.add_chain(..)` call to do
|
||||
_input=split_one_to_map
|
||||
)
|
||||
|
||||
Run!
|
||||
::::
|
||||
|
||||
You can now run the script (either with `bonobo run tutorial/pgdb.py` or directly with the python interpreter, as we
|
||||
added a "main" section) and the dataset should be inserted in your database. If you run it again, no new rows are
|
||||
created.
|
||||
|
||||
Reference in New Issue
Block a user