diff --git a/bonobo/contrib/django/commands.py b/bonobo/contrib/django/commands.py index 457273d..8729b12 100644 --- a/bonobo/contrib/django/commands.py +++ b/bonobo/contrib/django/commands.py @@ -1,4 +1,5 @@ from logging import getLogger +from types import GeneratorType import bonobo from bonobo.plugins.console import ConsoleOutputPlugin @@ -6,6 +7,7 @@ from bonobo.util.term import CLEAR_EOL from colorama import Fore, Back, Style from django.core.management import BaseCommand from django.core.management.base import OutputWrapper +from mondrian import term from .utils import create_or_update @@ -44,11 +46,17 @@ class ETLCommand(BaseCommand): self.stderr.style_func = lambda x: Fore.LIGHTRED_EX + Back.RED + '!' + Style.RESET_ALL + ' ' + x with bonobo.parse_args(options) as options: - result = bonobo.run( - self.get_graph(*args, **options), - services=self.get_services(), - ) + services = self.get_services() + graph_coll = self.get_graph(*args, **options) + + if not isinstance(graph_coll, GeneratorType): + graph_coll = (graph_coll,) + + for i, graph in enumerate(graph_coll): + assert isinstance(graph, bonobo.Graph), 'Invalid graph provided.' + print(term.lightwhite('{}. {}'.format(i + 1, graph.name))) + result = bonobo.run(graph, services=services) + print(term.lightblack(' ... return value: ' + str(result))) + print() self.stdout, self.stderr = _stdout_backup, _stderr_backup - - return '\nReturn Value: ' + str(result) diff --git a/bonobo/structs/graphs.py b/bonobo/structs/graphs.py index 58b78a6..a18fc45 100644 --- a/bonobo/structs/graphs.py +++ b/bonobo/structs/graphs.py @@ -3,11 +3,10 @@ import json from collections import namedtuple from copy import copy -from graphviz import ExecutableNotFound -from graphviz.dot import Digraph - from bonobo.constants import BEGIN from bonobo.util import get_name +from graphviz import ExecutableNotFound +from graphviz.dot import Digraph GraphRange = namedtuple('GraphRange', ['graph', 'input', 'output']) @@ -16,6 +15,7 @@ class Graph: """ Represents a directed graph of nodes. """ + name = '' def __init__(self, *chain): self.edges = {BEGIN: set()} diff --git a/docs/guide/graphs.rst b/docs/guide/graphs.rst index 61f6e72..e2dd7b8 100644 --- a/docs/guide/graphs.rst +++ b/docs/guide/graphs.rst @@ -5,6 +5,51 @@ Graphs are the glue that ties transformations together. They are the only data-s must be acyclic, and can contain as many nodes as your system can handle. However, although in theory the number of nodes can be rather high, practical use cases usually do not exceed more than a few hundred nodes and only then in extreme cases. +Within a graph, each node are isolated and can only communicate using their +input and output queues. For each input row, a given node will be called with +the row passed as arguments. Each *return* or *yield* value will be put on the +node's output queue, and the nodes connected in the graph will then be able to +process it. + +|bonobo| is a line-by-line data stream processing solution. + +Handling the data-flow this way brings the following properties: + +- **First in, first out**: unless stated otherwise, each node will receeive the + rows from FIFO queues, and so, the order of rows will be preserved. That is + true for each single node, but please note that if you define "graph bubbles" + (where a graph diverge in different branches then converge again), the + convergence node will receive rows FIFO from each input queue, meaning that + the order existing at the divergence point wont stay true at the convergence + point. + +- **Parallelism**: each node run in parallel (by default, using independant + threads). This is useful as you don't have to worry about blocking calls. + If a thread waits for, let's say, a database, or a network service, the other + nodes will continue handling data, as long as they have input rows available. + +- **Independance**: the rows are independant from each other, making this way + of working with data flows good for line-by-line data processing, but + also not ideal for "grouped" computations (where an output depends on more + than one line of input data). You can overcome this with rolling windows if + the input required are adjacent rows, but if you need to work on the whole + dataset at once, you should consider other software. + +Graphs are defined using :class:`bonobo.Graph` instances, as seen in the +previous tutorial step. + +What can be a node? +::::::::::::::::::: + +**TL;DR**: … anything, as long as it’s callable(). + +Functions +--------- + +.. code-block:: python + + def get_item(id): + return id, items.get(id) diff --git a/docs/guide/packaging.rst b/docs/guide/packaging.rst new file mode 100644 index 0000000..e69de29