Merge pull request #333 from hartym/314_connect_two_chains

Implements #314, make index_of() public in graph api, document node c…
This commit is contained in:
Romain Dorgueil
2019-06-01 11:26:34 +02:00
committed by GitHub
7 changed files with 215 additions and 116 deletions

View File

@ -8,50 +8,13 @@
import sys
from pathlib import Path
from bonobo._api import (
CsvReader,
CsvWriter,
FileReader,
FileWriter,
Filter,
FixedWindow,
Format,
Graph,
JsonReader,
JsonWriter,
LdjsonReader,
LdjsonWriter,
Limit,
MapFields,
OrderFields,
PickleReader,
PickleWriter,
PrettyPrinter,
RateLimited,
Rename,
SetFields,
Tee,
UnpackItems,
__all__,
__doc__,
count,
create_reader,
create_strategy,
create_writer,
get_argument_parser,
get_examples_path,
identity,
inspect,
noop,
open_examples_fs,
open_fs,
parse_args,
run,
)
from bonobo._version import __version__
from bonobo._api import *
from bonobo._api import __all__, __doc__
from bonobo._api import (
CsvReader, CsvWriter, FileReader, FileWriter, Filter, FixedWindow, Format, Graph, JsonReader, JsonWriter,
LdjsonReader, LdjsonWriter, Limit, MapFields, OrderFields, PickleReader, PickleWriter, PrettyPrinter, RateLimited,
Rename, SetFields, Tee, UnpackItems, __all__, __doc__, count, create_reader, create_strategy, create_writer,
get_argument_parser, get_examples_path, identity, inspect, noop, open_examples_fs, open_fs, parse_args, run
)
from bonobo._version import __version__
if sys.version_info < (3, 5):

View File

@ -2,13 +2,13 @@ from logging import getLogger
from types import GeneratorType
from colorama import Back, Fore, Style
from django.core.management import BaseCommand
from django.core.management.base import OutputWrapper
from mondrian import term
import bonobo
from bonobo.plugins.console import ConsoleOutputPlugin
from bonobo.util.term import CLEAR_EOL
from django.core.management import BaseCommand
from django.core.management.base import OutputWrapper
from .utils import create_or_update

View File

@ -7,9 +7,7 @@ at home if you want to give it a shot.
"""
from bonobo.execution.strategies.executor import (
AsyncThreadPoolExecutorStrategy,
ProcessPoolExecutorStrategy,
ThreadPoolExecutorStrategy,
AsyncThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy, ThreadPoolExecutorStrategy
)
from bonobo.execution.strategies.naive import NaiveStrategy

View File

@ -59,13 +59,14 @@ class Graph:
self.edges = {BEGIN: set()}
self.named = {}
self.nodes = []
self.add_chain(*chain)
if len(chain):
self.add_chain(*chain)
def __iter__(self):
yield from self.nodes
def __len__(self):
""" Node count.
"""Node count.
"""
return len(self.nodes)
@ -73,51 +74,112 @@ class Graph:
return self.nodes[key]
def get_cursor(self, ref=BEGIN):
return GraphCursor(self, last=self._resolve_index(ref))
return GraphCursor(self, last=self.index_of(ref))
def outputs_of(self, idx, create=False):
""" Get a set of the outputs for a given node index.
def index_of(self, mixed):
"""
if create and not idx in self.edges:
self.edges[idx] = set()
return self.edges[idx]
Find the index based on various strategies for a node, probably an input or output of chain. Supported
inputs are indexes, node values or names.
def add_node(self, c):
""" Add a node without connections in this graph and returns its index.
"""
if mixed is None:
return None
if type(mixed) is int or mixed in self.edges:
return mixed
if isinstance(mixed, str) and mixed in self.named:
return self.named[mixed]
if mixed in self.nodes:
return self.nodes.index(mixed)
raise ValueError("Cannot find node matching {!r}.".format(mixed))
def outputs_of(self, idx_or_node, create=False):
"""Get a set of the outputs for a given node, node index or name.
"""
idx_or_node = self.index_of(idx_or_node)
if create and not idx_or_node in self.edges:
self.edges[idx_or_node] = set()
return self.edges[idx_or_node]
def add_node(self, c, *, _name=None):
"""Add a node without connections in this graph and returns its index.
If _name is specified, name this node (string reference for further usage).
"""
idx = len(self.nodes)
self.edges[idx] = set()
self.nodes.append(c)
if _name:
if _name in self.named:
raise KeyError("Duplicate name {!r} in graph.".format(_name))
self.named[_name] = idx
return idx
def add_chain(self, *nodes, _input=BEGIN, _output=None, _name=None):
""" Add a chain in this graph.
"""Add `nodes` as a chain in this graph.
**Input rules**
* By default, this chain will be connected to `BEGIN`, a.k.a the special node that kickstarts transformations.
* If `_input` is set to `None`, then this chain won't receive any input unless you connect it manually to
something.
* If `_input` is something that can resolve to another node using `index_of` rules, then the chain will
receive the output stream of referenced node.
**Output rules**
* By default, this chain won't send its output anywhere. This is, most of the time, what you want.
* If `_output` is set to something (that can resolve to a node), then the last node in the chain will send its
outputs to the given node. This means you can provide an object, a name, or an index.
**Naming**
* If a `_name` is given, the first node in the chain will be named this way (same effect as providing a `_name`
to add_node).
**Special cases**
* You can use this method to connect two other chains (in fact, two nodes) by not giving any `nodes`, but
still providing values to `_input` and `_output`.
"""
if len(nodes):
_input = self._resolve_index(_input)
_output = self._resolve_index(_output)
_first = None
_last = None
_input = self.index_of(_input)
_output = self.index_of(_output)
_first = None
_last = None
for i, node in enumerate(nodes):
_last = self.add_node(node)
if not i and _name:
if _name in self.named:
raise KeyError("Duplicate name {!r} in graph.".format(_name))
self.named[_name] = _last
if _first is None:
_first = _last
self.outputs_of(_input, create=True).add(_last)
_input = _last
# Sanity checks.
if not len(nodes):
if _input is None or _output is None:
raise ValueError(
"Using add_chain(...) without nodes is only possible if you provide both _input and _output values."
)
if _output is not None:
self.outputs_of(_input, create=True).add(_output)
if _name is not None:
raise RuntimeError("Using add_chain(...) without nodes does not allow to use the _name parameter.")
if hasattr(self, "_topologcally_sorted_indexes_cache"):
del self._topologcally_sorted_indexes_cache
for i, node in enumerate(nodes):
_last = self.add_node(node, _name=_name if not i else None)
return GraphRange(self, _first, _last)
return GraphRange(self, None, None)
if _first is None:
_first = _last
self.outputs_of(_input, create=True).add(_last)
_input = _last
if _output is not None:
self.outputs_of(_input, create=True).add(_output)
if hasattr(self, "_topologcally_sorted_indexes_cache"):
del self._topologcally_sorted_indexes_cache
return GraphRange(self, _first, _last)
def copy(self):
g = Graph()
@ -191,26 +253,6 @@ class Graph:
except (ExecutableNotFound, FileNotFoundError) as exc:
return "<strong>{}</strong>: {}".format(type(exc).__name__, str(exc))
def _resolve_index(self, mixed):
"""
Find the index based on various strategies for a node, probably an input or output of chain. Supported
inputs are indexes, node values or names.
"""
if mixed is None:
return None
if type(mixed) is int or mixed in self.edges:
return mixed
if isinstance(mixed, str) and mixed in self.named:
return self.named[mixed]
if mixed in self.nodes:
return self.nodes.index(mixed)
raise ValueError("Cannot find node matching {!r}.".format(mixed))
def _get_graphviz_node_id(graph, i):
escaped_index = str(i)

View File

@ -6,15 +6,7 @@ and inspect transformations, graphs, and nodes.
from bonobo.util.collections import cast, ensure_tuple, sortedlist, tuplize
from bonobo.util.compat import deprecated, deprecated_alias
from bonobo.util.inspect import (
inspect_node,
isconfigurable,
isconfigurabletype,
iscontextprocessor,
isdict,
ismethod,
isoption,
istuple,
istype,
inspect_node, isconfigurable, isconfigurabletype, iscontextprocessor, isdict, ismethod, isoption, istuple, istype
)
from bonobo.util.objects import ValueHolder, get_attribute_or_create, get_name

View File

@ -213,10 +213,9 @@ Named nodes
Using above code to create convergences often leads to code which is hard to read, because you have to define the "target" stream
before the streams that logically goes to the beginning of the transformation graph. To overcome that, one can use
"named" nodes:
"named" nodes.
graph.add_chain(x, y, z, _name='zed')
graph.add_chain(f, g, h, _input='zed')
Please note that naming a chain is exactly the same thing as naming the first node of a chain.
.. code-block:: python
@ -224,13 +223,12 @@ before the streams that logically goes to the beginning of the transformation gr
graph = bonobo.Graph()
# Add two different chains
graph.add_chain(a, b, _output="load")
graph.add_chain(f, g, _output="load")
# Here we mark _input to None, so normalize won't get the "begin" impulsion.
graph.add_chain(normalize, store, _input=None, _name="load")
# Add two different chains that will output to the "load" node
graph.add_chain(a, b, _output="load")
graph.add_chain(f, g, _output="load")
Resulting graph:
@ -249,6 +247,43 @@ Resulting graph:
"normalize (load)" -> "store"
}
You can also create single nodes, and the api provide the same capability on single nodes.
.. code-block:: python
import bonobo
graph = bonobo.Graph()
# Create a node without any connection, name it.
graph.add_node(foo, _name="foo")
# Use it somewhere else as the data source.
graph.add_chain(..., _input="foo")
# ... or as the data sink.
graph.add_chain(..., _output="foo")
Connecting two nodes
::::::::::::::::::::
You may want to connect two nodes at some point. You can use `add_chain` without nodes to achieve it.
.. code-block:: python
import bonobo
graph = bonobo.Graph()
# Create two "anonymous" nodes
graph.add_node(a)
graph.add_node(b)
# Connect them
graph.add_chain(_input=a, _output=b)
Inspecting graphs
:::::::::::::::::

View File

@ -23,6 +23,27 @@ def test_graph_outputs_of():
assert len(g.outputs_of(0)) == 0
def test_graph_index_of():
g = Graph()
g.add_node(sentinel.foo)
g.add_node(sentinel.bar)
# sequential, can resolve objects
assert g.index_of(sentinel.foo) == 0
assert g.index_of(sentinel.bar) == 1
# calling on an index should return the index
assert g.index_of(sentinel.bar) == g.index_of(g.index_of(sentinel.bar))
# not existing should raise value error
with pytest.raises(ValueError):
g.index_of(sentinel.not_there)
# tokens resolve to themselves
assert g.index_of(BEGIN) == BEGIN
def test_graph_add_component():
g = Graph()
@ -35,6 +56,19 @@ def test_graph_add_component():
assert len(g.nodes) == 2
def test_invalid_graph_usage():
g = Graph()
with pytest.raises(ValueError):
g.add_chain()
g.add_node(sentinel.foo)
g.add_node(sentinel.bar)
with pytest.raises(RuntimeError):
g.add_chain(_input=sentinel.bar, _output=sentinel.foo, _name="this_is_not_possible")
def test_graph_add_chain():
g = Graph()
@ -63,6 +97,41 @@ def test_graph_topological_sort():
assert g[4] == sentinel.b2
def test_connect_two_chains():
g = Graph()
g.add_chain(sentinel.a1, sentinel.a2, _input=None, _output=None)
g.add_chain(sentinel.b1, sentinel.b2, _input=None, _output=None)
assert len(g.outputs_of(sentinel.a2)) == 0
g.add_chain(_input=sentinel.a2, _output=sentinel.b1)
assert g.outputs_of(sentinel.a2) == {g.index_of(sentinel.b1)}
def test_connect_two_anonymous_nodes():
g = Graph()
# Create two "anonymous" nodes
g.add_node(sentinel.a)
g.add_node(sentinel.b)
# Connect them
g.add_chain(_input=sentinel.a, _output=sentinel.b)
def test_named_nodes():
g = Graph()
a, b, c, d, e, f = sentinel.a, sentinel.b, sentinel.c, sentinel.d, sentinel.e, sentinel.f
# Here we mark _input to None, so normalize won't get the "begin" impulsion.
g.add_chain(e, f, _input=None, _name="load")
# Add two different chains
g.add_chain(a, b, _output="load")
g.add_chain(c, d, _output="load")
def test_copy():
g1 = Graph()
g2 = g1.copy()