Merge remote-tracking branch 'origin/develop' into develop

This commit is contained in:
Romain Dorgueil
2018-01-14 16:22:07 +01:00
4 changed files with 62 additions and 9 deletions

View File

@ -1,4 +1,5 @@
from logging import getLogger
from types import GeneratorType
import bonobo
from bonobo.plugins.console import ConsoleOutputPlugin
@ -6,6 +7,7 @@ from bonobo.util.term import CLEAR_EOL
from colorama import Fore, Back, Style
from django.core.management import BaseCommand
from django.core.management.base import OutputWrapper
from mondrian import term
from .utils import create_or_update
@ -44,11 +46,17 @@ class ETLCommand(BaseCommand):
self.stderr.style_func = lambda x: Fore.LIGHTRED_EX + Back.RED + '!' + Style.RESET_ALL + ' ' + x
with bonobo.parse_args(options) as options:
result = bonobo.run(
self.get_graph(*args, **options),
services=self.get_services(),
)
services = self.get_services()
graph_coll = self.get_graph(*args, **options)
if not isinstance(graph_coll, GeneratorType):
graph_coll = (graph_coll,)
for i, graph in enumerate(graph_coll):
assert isinstance(graph, bonobo.Graph), 'Invalid graph provided.'
print(term.lightwhite('{}. {}'.format(i + 1, graph.name)))
result = bonobo.run(graph, services=services)
print(term.lightblack(' ... return value: ' + str(result)))
print()
self.stdout, self.stderr = _stdout_backup, _stderr_backup
return '\nReturn Value: ' + str(result)

View File

@ -3,11 +3,10 @@ import json
from collections import namedtuple
from copy import copy
from graphviz import ExecutableNotFound
from graphviz.dot import Digraph
from bonobo.constants import BEGIN
from bonobo.util import get_name
from graphviz import ExecutableNotFound
from graphviz.dot import Digraph
GraphRange = namedtuple('GraphRange', ['graph', 'input', 'output'])
@ -16,6 +15,7 @@ class Graph:
"""
Represents a directed graph of nodes.
"""
name = ''
def __init__(self, *chain):
self.edges = {BEGIN: set()}

View File

@ -5,6 +5,51 @@ Graphs are the glue that ties transformations together. They are the only data-s
must be acyclic, and can contain as many nodes as your system can handle. However, although in theory the number of nodes can be rather high, practical use cases usually do not exceed more than a few hundred nodes and only then in extreme cases.
Within a graph, each node are isolated and can only communicate using their
input and output queues. For each input row, a given node will be called with
the row passed as arguments. Each *return* or *yield* value will be put on the
node's output queue, and the nodes connected in the graph will then be able to
process it.
|bonobo| is a line-by-line data stream processing solution.
Handling the data-flow this way brings the following properties:
- **First in, first out**: unless stated otherwise, each node will receeive the
rows from FIFO queues, and so, the order of rows will be preserved. That is
true for each single node, but please note that if you define "graph bubbles"
(where a graph diverge in different branches then converge again), the
convergence node will receive rows FIFO from each input queue, meaning that
the order existing at the divergence point wont stay true at the convergence
point.
- **Parallelism**: each node run in parallel (by default, using independant
threads). This is useful as you don't have to worry about blocking calls.
If a thread waits for, let's say, a database, or a network service, the other
nodes will continue handling data, as long as they have input rows available.
- **Independance**: the rows are independant from each other, making this way
of working with data flows good for line-by-line data processing, but
also not ideal for "grouped" computations (where an output depends on more
than one line of input data). You can overcome this with rolling windows if
the input required are adjacent rows, but if you need to work on the whole
dataset at once, you should consider other software.
Graphs are defined using :class:`bonobo.Graph` instances, as seen in the
previous tutorial step.
What can be a node?
:::::::::::::::::::
**TL;DR**: … anything, as long as its callable().
Functions
---------
.. code-block:: python
def get_item(id):
return id, items.get(id)

0
docs/guide/packaging.rst Normal file
View File