Files
bonobo/docs/guide/graphs.rst
2018-01-10 08:39:37 +01:00

297 lines
9.2 KiB
ReStructuredText
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

Graphs
======
Graphs are the glue that ties transformations together. They are the only data-structure bonobo can execute directly. Graphs
must be acyclic, and can contain as many nodes as your system can handle. However, although in theory the number of nodes can be rather high, practical use cases usually do not exceed more than a few hundred nodes and only then in extreme cases.
Within a graph, each node are isolated and can only communicate using their
input and output queues. For each input row, a given node will be called with
the row passed as arguments. Each *return* or *yield* value will be put on the
node's output queue, and the nodes connected in the graph will then be able to
process it.
|bonobo| is a line-by-line data stream processing solution.
Handling the data-flow this way brings the following properties:
- **First in, first out**: unless stated otherwise, each node will receeive the
rows from FIFO queues, and so, the order of rows will be preserved. That is
true for each single node, but please note that if you define "graph bubbles"
(where a graph diverge in different branches then converge again), the
convergence node will receive rows FIFO from each input queue, meaning that
the order existing at the divergence point wont stay true at the convergence
point.
- **Parallelism**: each node run in parallel (by default, using independant
threads). This is useful as you don't have to worry about blocking calls.
If a thread waits for, let's say, a database, or a network service, the other
nodes will continue handling data, as long as they have input rows available.
- **Independance**: the rows are independant from each other, making this way
of working with data flows good for line-by-line data processing, but
also not ideal for "grouped" computations (where an output depends on more
than one line of input data). You can overcome this with rolling windows if
the input required are adjacent rows, but if you need to work on the whole
dataset at once, you should consider other software.
Graphs are defined using :class:`bonobo.Graph` instances, as seen in the
previous tutorial step.
What can be a node?
:::::::::::::::::::
**TL;DR**: … anything, as long as its callable().
Functions
---------
.. code-block:: python
def get_item(id):
return id, items.get(id)
Each node of a graph will be executed in isolation from the other nodes, and the data is passed from one node to the
next using FIFO queues, managed by the framework. It's transparent to the end-user, though, and you'll only use
function arguments (for inputs) and return/yield values (for outputs).
Each input row of a node will cause one call to this node's callable. Each output is cast internally as a tuple-like
data structure (or more precisely, a namedtuple-like data structure), and for one given node, each output row must
have the same structure.
If you return/yield something which is not a tuple, bonobo will create a tuple of one element.
Properties
----------
|bonobo| assists you with defining the data-flow of your data engineering process, and then streams data through your
callable graphs.
* Each node call will process one row of data.
* Queues that flows the data between node are first-in, first-out (FIFO) standard python :class:`queue.Queue`.
* Each node will run in parallel
* Default execution strategy use threading, and each node will run in a separate thread.
Fault tolerance
---------------
Node execution is fault tolerant.
If an exception is raised from a node call, then this node call will be aborted but bonobo will continue the execution
with the next row (after outputing the stack trace and incrementing the "err" counter for the node context).
It allows to have ETL jobs that ignore faulty data and try their best to process the valid rows of a dataset.
Some errors are fatal, though.
If you pass a 2 elements tuple to a node that takes 3 args, |bonobo| will raise an :class:`bonobo.errors.UnrecoverableTypeError`, and exit the
current graph execution as fast as it can (finishing the other node executions that are in progress first, but not
starting new ones if there are remaining input rows).
Definitions
:::::::::::
Graph
A directed acyclic graph of transformations, that Bonobo can inspect and execute.
Node
A transformation within a graph. The transformations are stateless, and have no idea whether or not they are
included in a graph, multiple graph, or not at all.
Creating a graph
::::::::::::::::
Graphs should be instances of :class:`bonobo.Graph`. The :func:`bonobo.Graph.add_chain` method can take as many
positional parameters as you want.
.. code-block:: python
import bonobo
graph = bonobo.Graph()
graph.add_chain(a, b, c)
Resulting graph:
.. graphviz::
digraph {
rankdir = LR;
stylesheet = "../_static/graphs.css";
BEGIN [shape="point"];
BEGIN -> "a" -> "b" -> "c";
}
Non-linear graphs
:::::::::::::::::
Divergences / forks
-------------------
To create two or more divergent data streams ("forks"), you should specify the `_input` kwarg to `add_chain`.
.. code-block:: python
import bonobo
graph = bonobo.Graph()
graph.add_chain(a, b, c)
graph.add_chain(f, g, _input=b)
Resulting graph:
.. graphviz::
digraph {
rankdir = LR;
stylesheet = "../_static/graphs.css";
BEGIN [shape="point"];
BEGIN -> "a" -> "b" -> "c";
"b" -> "f" -> "g";
}
.. note:: Both branches will receive the same data and at the same time.
Convergence / merges
---------------------
To merge two data streams, you can use the `_output` kwarg to `add_chain`, or use named nodes (see below).
.. code-block:: python
import bonobo
graph = bonobo.Graph()
# Here we set _input to None, so normalize won't start on its own but only after it receives input from the other chains.
graph.add_chain(normalize, store, _input=None)
# Add two different chains
graph.add_chain(a, b, _output=normalize)
graph.add_chain(f, g, _output=normalize)
Resulting graph:
.. graphviz::
digraph {
rankdir = LR;
stylesheet = "../_static/graphs.css";
BEGIN [shape="point"];
BEGIN -> "a" -> "b" -> "normalize";
BEGIN2 [shape="point"];
BEGIN2 -> "f" -> "g" -> "normalize";
"normalize" -> "store"
}
.. note::
This is not a "join" or "cartesian product". Any data that comes from `b` or `g` will go through `normalize`, one at
a time. Think of the graph edges as data flow pipes.
Named nodes
:::::::::::
Using above code to create convergences often leads to code which is hard to read, because you have to define the "target" stream
before the streams that logically goes to the beginning of the transformation graph. To overcome that, one can use
"named" nodes:
graph.add_chain(x, y, z, _name='zed')
graph.add_chain(f, g, h, _input='zed')
.. code-block:: python
import bonobo
graph = bonobo.Graph()
# Add two different chains
graph.add_chain(a, b, _output="load")
graph.add_chain(f, g, _output="load")
# Here we mark _input to None, so normalize won't get the "begin" impulsion.
graph.add_chain(normalize, store, _input=None, _name="load")
Resulting graph:
.. graphviz::
digraph {
rankdir = LR;
stylesheet = "../_static/graphs.css";
BEGIN [shape="point"];
BEGIN -> "a" -> "b" -> "normalize (load)";
BEGIN2 [shape="point"];
BEGIN2 -> "f" -> "g" -> "normalize (load)";
"normalize (load)" -> "store"
}
Inspecting graphs
:::::::::::::::::
Bonobo is bundled with an "inspector", that can use graphviz to let you visualize your graphs.
Read `How to inspect and visualize your graph <https://www.bonobo-project.org/how-to/inspect-an-etl-jobs-graph>`_.
Executing graphs
::::::::::::::::
There are two options to execute a graph (which have a similar result, but are targeting different use cases).
* You can use the bonobo command line interface, which is the highest level interface.
* You can use the python API, which is lower level but allows to use bonobo from within your own code (for example, a
django management command).
Executing a graph with the command line interface
-------------------------------------------------
If there is no good reason not to, you should use `bonobo run ...` to run transformation graphs found in your python
source code files.
.. code-block:: shell-session
$ bonobo run file.py
You can also run a python module:
.. code-block:: shell-session
$ bonobo run -m my.own.etlmod
In each case, bonobo's CLI will look for an instance of :class:`bonobo.Graph` in your file/module, create the plumbing
needed to execute it, and run it.
If you're in an interactive terminal context, it will use :class:`bonobo.ext.console.ConsoleOutputPlugin` for display.
If you're in a jupyter notebook context, it will (try to) use :class:`bonobo.ext.jupyter.JupyterOutputPlugin`.
Executing a graph using the internal API
----------------------------------------
To integrate bonobo executions in any other python code, you should use :func:`bonobo.run`. It behaves very similar to
the CLI, and reading the source you should be able to figure out its usage quite easily.