[django] Adds ability to create a get_graph() command method as a generator for multiple, synchronous jobs execution in django command.
This commit is contained in:
@ -1,4 +1,5 @@
|
|||||||
from logging import getLogger
|
from logging import getLogger
|
||||||
|
from types import GeneratorType
|
||||||
|
|
||||||
import bonobo
|
import bonobo
|
||||||
from bonobo.plugins.console import ConsoleOutputPlugin
|
from bonobo.plugins.console import ConsoleOutputPlugin
|
||||||
@ -6,6 +7,7 @@ from bonobo.util.term import CLEAR_EOL
|
|||||||
from colorama import Fore, Back, Style
|
from colorama import Fore, Back, Style
|
||||||
from django.core.management import BaseCommand
|
from django.core.management import BaseCommand
|
||||||
from django.core.management.base import OutputWrapper
|
from django.core.management.base import OutputWrapper
|
||||||
|
from mondrian import term
|
||||||
|
|
||||||
from .utils import create_or_update
|
from .utils import create_or_update
|
||||||
|
|
||||||
@ -44,11 +46,17 @@ class ETLCommand(BaseCommand):
|
|||||||
self.stderr.style_func = lambda x: Fore.LIGHTRED_EX + Back.RED + '!' + Style.RESET_ALL + ' ' + x
|
self.stderr.style_func = lambda x: Fore.LIGHTRED_EX + Back.RED + '!' + Style.RESET_ALL + ' ' + x
|
||||||
|
|
||||||
with bonobo.parse_args(options) as options:
|
with bonobo.parse_args(options) as options:
|
||||||
result = bonobo.run(
|
services = self.get_services()
|
||||||
self.get_graph(*args, **options),
|
graph_coll = self.get_graph(*args, **options)
|
||||||
services=self.get_services(),
|
|
||||||
)
|
if not isinstance(graph_coll, GeneratorType):
|
||||||
|
graph_coll = (graph_coll,)
|
||||||
|
|
||||||
|
for i, graph in enumerate(graph_coll):
|
||||||
|
assert isinstance(graph, bonobo.Graph), 'Invalid graph provided.'
|
||||||
|
print(term.lightwhite('{}. {}'.format(i + 1, graph.name)))
|
||||||
|
result = bonobo.run(graph, services=services)
|
||||||
|
print(term.lightblack(' ... return value: ' + str(result)))
|
||||||
|
print()
|
||||||
|
|
||||||
self.stdout, self.stderr = _stdout_backup, _stderr_backup
|
self.stdout, self.stderr = _stdout_backup, _stderr_backup
|
||||||
|
|
||||||
return '\nReturn Value: ' + str(result)
|
|
||||||
|
|||||||
@ -3,11 +3,10 @@ import json
|
|||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from copy import copy
|
from copy import copy
|
||||||
|
|
||||||
from graphviz import ExecutableNotFound
|
|
||||||
from graphviz.dot import Digraph
|
|
||||||
|
|
||||||
from bonobo.constants import BEGIN
|
from bonobo.constants import BEGIN
|
||||||
from bonobo.util import get_name
|
from bonobo.util import get_name
|
||||||
|
from graphviz import ExecutableNotFound
|
||||||
|
from graphviz.dot import Digraph
|
||||||
|
|
||||||
GraphRange = namedtuple('GraphRange', ['graph', 'input', 'output'])
|
GraphRange = namedtuple('GraphRange', ['graph', 'input', 'output'])
|
||||||
|
|
||||||
@ -16,6 +15,7 @@ class Graph:
|
|||||||
"""
|
"""
|
||||||
Represents a directed graph of nodes.
|
Represents a directed graph of nodes.
|
||||||
"""
|
"""
|
||||||
|
name = ''
|
||||||
|
|
||||||
def __init__(self, *chain):
|
def __init__(self, *chain):
|
||||||
self.edges = {BEGIN: set()}
|
self.edges = {BEGIN: set()}
|
||||||
|
|||||||
0
docs/guide/packaging.rst
Normal file
0
docs/guide/packaging.rst
Normal file
@ -4,7 +4,54 @@ Part 2: Writing ETL Jobs
|
|||||||
What's an ETL job ?
|
What's an ETL job ?
|
||||||
:::::::::::::::::::
|
:::::::::::::::::::
|
||||||
|
|
||||||
In |bonobo|, an ETL job is a formal definition of an executable graph.
|
In |bonobo|, an ETL job is a single graph that can be executed on its own.
|
||||||
|
|
||||||
|
Within a graph, each node are isolated and can only communicate using their
|
||||||
|
input and output queues. For each input row, a given node will be called with
|
||||||
|
the row passed as arguments. Each *return* or *yield* value will be put on the
|
||||||
|
node's output queue, and the nodes connected in the graph will then be able to
|
||||||
|
process it.
|
||||||
|
|
||||||
|
|bonobo| is a line-by-line data stream processing solution.
|
||||||
|
|
||||||
|
Handling the data-flow this way brings the following properties:
|
||||||
|
|
||||||
|
- **First in, first out**: unless stated otherwise, each node will receeive the
|
||||||
|
rows from FIFO queues, and so, the order of rows will be preserved. That is
|
||||||
|
true for each single node, but please note that if you define "graph bubbles"
|
||||||
|
(where a graph diverge in different branches then converge again), the
|
||||||
|
convergence node will receive rows FIFO from each input queue, meaning that
|
||||||
|
the order existing at the divergence point wont stay true at the convergence
|
||||||
|
point.
|
||||||
|
|
||||||
|
- **Parallelism**: each node run in parallel (by default, using independant
|
||||||
|
threads). This is useful as you don't have to worry about blocking calls.
|
||||||
|
If a thread waits for, let's say, a database, or a network service, the other
|
||||||
|
nodes will continue handling data, as long as they have input rows available.
|
||||||
|
|
||||||
|
- **Independance**: the rows are independant from each other, making this way
|
||||||
|
of working with data flows good for line-by-line data processing, but
|
||||||
|
also not ideal for "grouped" computations (where an output depends on more
|
||||||
|
than one line of input data). You can overcome this with rolling windows if
|
||||||
|
the input required are adjacent rows, but if you need to work on the whole
|
||||||
|
dataset at once, you should consider other software.
|
||||||
|
|
||||||
|
Graphs are defined using :class:`bonobo.Graph` instances, as seen in the
|
||||||
|
previous tutorial step.
|
||||||
|
|
||||||
|
What can be a node?
|
||||||
|
:::::::::::::::::::
|
||||||
|
|
||||||
|
**TL;DR**: … anything, as long as it’s callable().
|
||||||
|
|
||||||
|
Functions
|
||||||
|
---------
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
def get_item(id):
|
||||||
|
return id, items.get(id)
|
||||||
|
|
||||||
|
|
||||||
Each node of a graph will be executed in isolation from the other nodes, and the data is passed from one node to the
|
Each node of a graph will be executed in isolation from the other nodes, and the data is passed from one node to the
|
||||||
next using FIFO queues, managed by the framework. It's transparent to the end-user, though, and you'll only use
|
next using FIFO queues, managed by the framework. It's transparent to the end-user, though, and you'll only use
|
||||||
|
|||||||
Reference in New Issue
Block a user