diff --git a/bonobo/contrib/django/commands.py b/bonobo/contrib/django/commands.py index 457273d..8729b12 100644 --- a/bonobo/contrib/django/commands.py +++ b/bonobo/contrib/django/commands.py @@ -1,4 +1,5 @@ from logging import getLogger +from types import GeneratorType import bonobo from bonobo.plugins.console import ConsoleOutputPlugin @@ -6,6 +7,7 @@ from bonobo.util.term import CLEAR_EOL from colorama import Fore, Back, Style from django.core.management import BaseCommand from django.core.management.base import OutputWrapper +from mondrian import term from .utils import create_or_update @@ -44,11 +46,17 @@ class ETLCommand(BaseCommand): self.stderr.style_func = lambda x: Fore.LIGHTRED_EX + Back.RED + '!' + Style.RESET_ALL + ' ' + x with bonobo.parse_args(options) as options: - result = bonobo.run( - self.get_graph(*args, **options), - services=self.get_services(), - ) + services = self.get_services() + graph_coll = self.get_graph(*args, **options) + + if not isinstance(graph_coll, GeneratorType): + graph_coll = (graph_coll,) + + for i, graph in enumerate(graph_coll): + assert isinstance(graph, bonobo.Graph), 'Invalid graph provided.' + print(term.lightwhite('{}. {}'.format(i + 1, graph.name))) + result = bonobo.run(graph, services=services) + print(term.lightblack(' ... return value: ' + str(result))) + print() self.stdout, self.stderr = _stdout_backup, _stderr_backup - - return '\nReturn Value: ' + str(result) diff --git a/bonobo/structs/graphs.py b/bonobo/structs/graphs.py index 39de1fe..3e1fd23 100644 --- a/bonobo/structs/graphs.py +++ b/bonobo/structs/graphs.py @@ -3,11 +3,10 @@ import json from collections import namedtuple from copy import copy -from graphviz import ExecutableNotFound -from graphviz.dot import Digraph - from bonobo.constants import BEGIN from bonobo.util import get_name +from graphviz import ExecutableNotFound +from graphviz.dot import Digraph GraphRange = namedtuple('GraphRange', ['graph', 'input', 'output']) @@ -16,6 +15,7 @@ class Graph: """ Represents a directed graph of nodes. """ + name = '' def __init__(self, *chain): self.edges = {BEGIN: set()} diff --git a/docs/guide/packaging.rst b/docs/guide/packaging.rst new file mode 100644 index 0000000..e69de29 diff --git a/docs/tutorial/2-jobs.rst b/docs/tutorial/2-jobs.rst index e7d4baf..1edf19b 100644 --- a/docs/tutorial/2-jobs.rst +++ b/docs/tutorial/2-jobs.rst @@ -4,7 +4,54 @@ Part 2: Writing ETL Jobs What's an ETL job ? ::::::::::::::::::: -In |bonobo|, an ETL job is a formal definition of an executable graph. +In |bonobo|, an ETL job is a single graph that can be executed on its own. + +Within a graph, each node are isolated and can only communicate using their +input and output queues. For each input row, a given node will be called with +the row passed as arguments. Each *return* or *yield* value will be put on the +node's output queue, and the nodes connected in the graph will then be able to +process it. + +|bonobo| is a line-by-line data stream processing solution. + +Handling the data-flow this way brings the following properties: + +- **First in, first out**: unless stated otherwise, each node will receeive the + rows from FIFO queues, and so, the order of rows will be preserved. That is + true for each single node, but please note that if you define "graph bubbles" + (where a graph diverge in different branches then converge again), the + convergence node will receive rows FIFO from each input queue, meaning that + the order existing at the divergence point wont stay true at the convergence + point. + +- **Parallelism**: each node run in parallel (by default, using independant + threads). This is useful as you don't have to worry about blocking calls. + If a thread waits for, let's say, a database, or a network service, the other + nodes will continue handling data, as long as they have input rows available. + +- **Independance**: the rows are independant from each other, making this way + of working with data flows good for line-by-line data processing, but + also not ideal for "grouped" computations (where an output depends on more + than one line of input data). You can overcome this with rolling windows if + the input required are adjacent rows, but if you need to work on the whole + dataset at once, you should consider other software. + +Graphs are defined using :class:`bonobo.Graph` instances, as seen in the +previous tutorial step. + +What can be a node? +::::::::::::::::::: + +**TL;DR**: … anything, as long as it’s callable(). + +Functions +--------- + +.. code-block:: python + + def get_item(id): + return id, items.get(id) + Each node of a graph will be executed in isolation from the other nodes, and the data is passed from one node to the next using FIFO queues, managed by the framework. It's transparent to the end-user, though, and you'll only use