Merge pull request #335 from hartym/develop

Forks & Merges
This commit is contained in:
Romain Dorgueil
2019-06-02 08:56:46 +02:00
committed by GitHub
6 changed files with 170 additions and 48 deletions

View File

@ -1,3 +1,4 @@
import inspect
import os
from jinja2 import DictLoader, Environment
@ -30,8 +31,6 @@ class Module:
return os.path.join(__path__, apidoc_root, *self.name.split(".")) + ".rst"
import inspect
bonobo = __import__("bonobo")
assert bonobo.__version__

View File

@ -59,7 +59,11 @@ class PartialGraph:
class Graph:
"""
Represents a directed graph of nodes.
Core structure representing a directed graph of nodes. It will be used to create data streaming queues between your
objects during the job execution.
This is how the data flows are defined.
"""
name = ""
@ -75,7 +79,9 @@ class Graph:
yield from self.nodes
def __len__(self):
"""Node count.
"""
The graph length is defined as its node count.
"""
return len(self.nodes)
@ -92,8 +98,19 @@ class Graph:
return self.get_cursor().__rshift__(other)
def get_cursor(self, ref=BEGIN):
"""
Create a `GraphCursor` to use the operator-based syntax to build graph, starting at `ref`.
"""
return GraphCursor(self, last=self.index_of(ref))
def orphan(self):
"""
Create a `GraphCursor` attached to nothing.
"""
return self.get_cursor(None)
def index_of(self, mixed):
"""
Find the index based on various strategies for a node, probably an input or output of chain. Supported
@ -115,10 +132,16 @@ class Graph:
raise ValueError("Cannot find node matching {!r}.".format(mixed))
def indexes_of(self, *things):
"""
Returns the set of indexes of the things passed as arguments.
"""
return set(map(self.index_of, things))
def outputs_of(self, idx_or_node, create=False):
"""Get a set of the outputs for a given node, node index or name.
"""
Get a set of the outputs for a given node, node index or name.
"""
idx_or_node = self.index_of(idx_or_node)
@ -127,8 +150,10 @@ class Graph:
return self.edges[idx_or_node]
def add_node(self, new_node, *, _name=None):
"""Add a node without connections in this graph and returns its index.
"""
Add a node without connections in this graph and returns its index.
If _name is specified, name this node (string reference for further usage).
"""
idx = len(self.nodes)
self.edges[idx] = set()
@ -149,7 +174,8 @@ class Graph:
return self.add_node(new_node, _name=_name)
def add_chain(self, *nodes, _input=BEGIN, _output=None, _name=None, use_existing_nodes=False):
"""Add `nodes` as a chain in this graph.
"""
Add `nodes` as a chain in this graph.
**Input rules**
@ -222,7 +248,9 @@ class Graph:
@property
def topologically_sorted_indexes(self):
"""Iterate in topological order, based on networkx's topological_sort() function.
"""
Iterate in topological order, based on networkx's topological_sort() function.
"""
try:
return self._topologcally_sorted_indexes_cache

View File

@ -5,7 +5,7 @@ import os
import runpy
import sys
from contextlib import contextmanager, redirect_stderr, redirect_stdout
from unittest.mock import patch
from unittest.mock import patch, sentinel
import pytest
@ -14,6 +14,7 @@ from bonobo.commands import entrypoint
from bonobo.execution.contexts.graph import GraphExecutionContext
from bonobo.execution.contexts.node import NodeExecutionContext
from bonobo.structs.tokens import Token
from bonobo.util import tuplize
@contextmanager
@ -26,6 +27,11 @@ def optional_contextmanager(cm, *, ignore=False):
class FilesystemTester:
"""
Helper that create temporary filesystem service to be used in unit tests.
"""
def __init__(self, extension="txt", mode="w", *, input_data=""):
self.extension = extension
self.input_data = input_data
@ -43,6 +49,12 @@ class FilesystemTester:
class QueueList(list):
"""
A list that behave like a queue (or is it the oposite?).
The datastructure is not smart at all, but it's quite useful for testing.
"""
def append(self, item):
if not isinstance(item, Token):
super(QueueList, self).append(item)
@ -51,6 +63,11 @@ class QueueList(list):
class BufferingContext:
"""
Base class to add a buffer to a context.
"""
def __init__(self, buffer=None):
if buffer is None:
buffer = QueueList()
@ -64,12 +81,22 @@ class BufferingContext:
class BufferingNodeExecutionContext(BufferingContext, NodeExecutionContext):
"""
Node execution context that actually stores the node outputs in a buffer, so one can test it afterward.
"""
def __init__(self, *args, buffer=None, **kwargs):
BufferingContext.__init__(self, buffer)
NodeExecutionContext.__init__(self, *args, **kwargs, _outputs=[self.buffer])
class BufferingGraphExecutionContext(BufferingContext, GraphExecutionContext):
"""
Graph execution context that uses buffering node execution contexts, all nodes buffering to the same buffer.
"""
NodeExecutionContextType = BufferingNodeExecutionContext
def __init__(self, *args, buffer=None, **kwargs):
@ -99,13 +126,13 @@ def runner(f):
@runner
def runner_entrypoint(args):
""" Run bonobo using the python command entrypoint directly (bonobo.commands.entrypoint). """
"""Run bonobo using the python command entrypoint directly (bonobo.commands.entrypoint). """
return entrypoint(args)
@runner
def runner_module(args):
""" Run bonobo using the bonobo.__main__ file, which is equivalent as doing "python -m bonobo ..."."""
"""Run bonobo using the bonobo.__main__ file, which is equivalent as doing "python -m bonobo ..."."""
with patch.object(sys, "argv", ["bonobo", *args]):
return runpy.run_path(__main__.__file__, run_name="__main__")
@ -192,7 +219,10 @@ class ConfigurableNodeTest:
class ReaderTest(ConfigurableNodeTest):
""" Helper class to test reader transformations. """
"""
Helper class to test reader transformations.
"""
ReaderNodeType = None
@ -232,7 +262,10 @@ class ReaderTest(ConfigurableNodeTest):
class WriterTest(ConfigurableNodeTest):
""" Helper class to test writer transformations. """
"""
Helper class to test writer transformations.
"""
WriterNodeType = None
@ -255,3 +288,15 @@ class WriterTest(ConfigurableNodeTest):
def readlines(self):
with self.fs.open(self.filename) as fp:
return tuple(map(str.strip, fp.readlines()))
@tuplize
def get_pseudo_nodes(*names):
"""
Generates a serie of named sentinels to test graph APIs.
>>> a, b, c = get_pseudo_nodes(*"abc")
"""
for name in names:
yield getattr(sentinel, name)

View File

@ -201,8 +201,11 @@ positional parameters as you want.
.. note::
As of |bonobo| 0.7, a new syntax is available that we believe is more powerfull and more readable than the legacy
`add_chain` method. The former API is here to stay and it's perfectly safe to use it, but if it is an option, you
should consider the new syntax. During the transition period, we'll document both.
`add_chain` method. The former API is here to stay and it's perfectly safe to use it (in fact, the new syntax uses
`add_chain` under the hood).
If it is an option for you, we suggest you consider the new syntax. During the transition period, we'll document
both but the new syntax will eventually become default.
.. code-block:: python
@ -393,6 +396,33 @@ You can also create single nodes, and the api provide the same capability on sin
graph.add_chain(..., _output="foo")
Orphan nodes / chains
:::::::::::::::::::::
The default behaviour of `add_chain` (or `get_cursor`) is to connect the first node to the special `BEGIN` token, which
instruct |bonobo| to call the connected node once without parameter to kickstart the data stream.
This is normally what you want, but there are ways to override it, as you may want to add "orphan" nodes or chains to your graph.
.. code-block:: python
import bonobo
graph = bonobo.Graph()
# using add_node will naturally add a node as "orphan"
graph.add_node(a)
# using add_chain with "None" as the input will create an orphan chain
graph.add_chain(a, b, c, _input=None)
# using the new syntax, you can use either get_cursor(None) or the orphan() shortcut
graph.get_cursor(None) >> a >> b >> c
# ... using the shortcut ...
graph.orphan() >> a >> b >> c
Connecting two nodes
::::::::::::::::::::

View File

@ -4,6 +4,7 @@ import pytest
from bonobo.constants import BEGIN
from bonobo.structs.graphs import Graph
from bonobo.util.testing import get_pseudo_nodes
identity = lambda x: x
@ -26,19 +27,21 @@ def test_graph_outputs_of():
def test_graph_index_of():
g = Graph()
g.add_node(sentinel.foo)
g.add_node(sentinel.bar)
foo, bar, not_there = get_pseudo_nodes("foo", "bar", "not_there")
g.add_node(foo)
g.add_node(bar)
# sequential, can resolve objects
assert g.index_of(sentinel.foo) == 0
assert g.index_of(sentinel.bar) == 1
assert g.index_of(foo) == 0
assert g.index_of(bar) == 1
# calling on an index should return the index
assert g.index_of(sentinel.bar) == g.index_of(g.index_of(sentinel.bar))
assert g.index_of(bar) == g.index_of(g.index_of(bar))
# not existing should raise value error
with pytest.raises(ValueError):
g.index_of(sentinel.not_there)
g.index_of(not_there)
# tokens resolve to themselves
assert g.index_of(BEGIN) == BEGIN
@ -58,15 +61,16 @@ def test_graph_add_component():
def test_invalid_graph_usage():
g = Graph()
foo, bar = get_pseudo_nodes("foo", "bar")
with pytest.raises(ValueError):
g.add_chain()
g.add_node(sentinel.foo)
g.add_node(sentinel.bar)
g.add_node(foo)
g.add_node(bar)
with pytest.raises(RuntimeError):
g.add_chain(_input=sentinel.bar, _output=sentinel.foo, _name="this_is_not_possible")
g.add_chain(_input=bar, _output=foo, _name="this_is_not_possible")
def test_graph_add_chain():
@ -81,48 +85,51 @@ def test_graph_add_chain():
def test_graph_topological_sort():
g = Graph()
a1, a2, a3, b1, b2 = get_pseudo_nodes("a1", "a2", "a3", "b1", "b2")
g.add_chain(sentinel.a1, sentinel.a2, sentinel.a3, _input=None, _output=None)
g.add_chain(a1, a2, a3, _input=None, _output=None)
assert g.topologically_sorted_indexes == (0, 1, 2)
assert g[0] == sentinel.a1
assert g[1] == sentinel.a2
assert g[2] == sentinel.a3
assert g[0] == a1
assert g[1] == a2
assert g[2] == a3
g.add_chain(sentinel.b1, sentinel.b2, _output=sentinel.a2)
g.add_chain(b1, b2, _output=a2)
assert g.topologically_sorted_indexes[-2:] == (1, 2)
assert g.topologically_sorted_indexes.index(3) < g.topologically_sorted_indexes.index(4)
assert g[3] == sentinel.b1
assert g[4] == sentinel.b2
assert g[3] == b1
assert g[4] == b2
def test_connect_two_chains():
g = Graph()
a1, a2, b1, b2 = get_pseudo_nodes("a1", "a2", "b1", "b2")
g.add_chain(sentinel.a1, sentinel.a2, _input=None, _output=None)
g.add_chain(sentinel.b1, sentinel.b2, _input=None, _output=None)
assert len(g.outputs_of(sentinel.a2)) == 0
g.add_chain(a1, a2, _input=None, _output=None)
g.add_chain(b1, b2, _input=None, _output=None)
assert len(g.outputs_of(a2)) == 0
g.add_chain(_input=sentinel.a2, _output=sentinel.b1)
assert g.outputs_of(sentinel.a2) == {g.index_of(sentinel.b1)}
g.add_chain(_input=a2, _output=b1)
assert g.outputs_of(a2) == g.indexes_of(b1)
def test_connect_two_anonymous_nodes():
g = Graph()
a, b = get_pseudo_nodes(*"ab")
# Create two "anonymous" nodes
g.add_node(sentinel.a)
g.add_node(sentinel.b)
g.add_node(a)
g.add_node(b)
# Connect them
g.add_chain(_input=sentinel.a, _output=sentinel.b)
g.add_chain(_input=a, _output=b)
def test_named_nodes():
g = Graph()
a, b, c, d, e, f = sentinel.a, sentinel.b, sentinel.c, sentinel.d, sentinel.e, sentinel.f
a, b, c, d, e, f = get_pseudo_nodes(*"abcdef")
# Here we mark _input to None, so normalize won't get the "begin" impulsion.
g.add_chain(e, f, _input=None, _name="load")

View File

@ -1,17 +1,10 @@
from operator import attrgetter
from unittest.mock import sentinel
import pytest
from bonobo.constants import BEGIN
from bonobo.structs.graphs import Graph, GraphCursor
from bonobo.util import tuplize
@tuplize
def get_pseudo_nodes(*names):
for name in names:
yield getattr(sentinel, name)
from bonobo.util.testing import get_pseudo_nodes
def test_get_cursor():
@ -127,3 +120,23 @@ def test_cursor_merge():
assert g.outputs_of(c) == set()
assert c1 == c2
def test_cursor_merge_orphan_in_between():
a, b, c, v, w, x, y = get_pseudo_nodes(*"abcdefg")
g = Graph()
g >> a >> b >> c
assert len(g) == 3
g.orphan() >> v >> w >> b
assert len(g) == 5
g.orphan() >> x >> y >> b
assert len(g) == 7
assert g.outputs_of(BEGIN) == g.indexes_of(a)
assert g.outputs_of(a) == g.indexes_of(b)
assert g.outputs_of(b) == g.indexes_of(c)
assert g.outputs_of(c) == set()
assert g.outputs_of(v) == g.indexes_of(w)
assert g.outputs_of(w) == g.indexes_of(b)
assert g.outputs_of(x) == g.indexes_of(y)
assert g.outputs_of(y) == g.indexes_of(b)