Implements #314, make index_of() public in graph api, document node connections and let outputs_of() take anything resolvable by index_of().
This commit is contained in:
@ -8,50 +8,13 @@
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from bonobo._api import (
|
||||
CsvReader,
|
||||
CsvWriter,
|
||||
FileReader,
|
||||
FileWriter,
|
||||
Filter,
|
||||
FixedWindow,
|
||||
Format,
|
||||
Graph,
|
||||
JsonReader,
|
||||
JsonWriter,
|
||||
LdjsonReader,
|
||||
LdjsonWriter,
|
||||
Limit,
|
||||
MapFields,
|
||||
OrderFields,
|
||||
PickleReader,
|
||||
PickleWriter,
|
||||
PrettyPrinter,
|
||||
RateLimited,
|
||||
Rename,
|
||||
SetFields,
|
||||
Tee,
|
||||
UnpackItems,
|
||||
__all__,
|
||||
__doc__,
|
||||
count,
|
||||
create_reader,
|
||||
create_strategy,
|
||||
create_writer,
|
||||
get_argument_parser,
|
||||
get_examples_path,
|
||||
identity,
|
||||
inspect,
|
||||
noop,
|
||||
open_examples_fs,
|
||||
open_fs,
|
||||
parse_args,
|
||||
run,
|
||||
)
|
||||
from bonobo._version import __version__
|
||||
|
||||
from bonobo._api import *
|
||||
from bonobo._api import __all__, __doc__
|
||||
from bonobo._api import (
|
||||
CsvReader, CsvWriter, FileReader, FileWriter, Filter, FixedWindow, Format, Graph, JsonReader, JsonWriter,
|
||||
LdjsonReader, LdjsonWriter, Limit, MapFields, OrderFields, PickleReader, PickleWriter, PrettyPrinter, RateLimited,
|
||||
Rename, SetFields, Tee, UnpackItems, __all__, __doc__, count, create_reader, create_strategy, create_writer,
|
||||
get_argument_parser, get_examples_path, identity, inspect, noop, open_examples_fs, open_fs, parse_args, run
|
||||
)
|
||||
from bonobo._version import __version__
|
||||
|
||||
if sys.version_info < (3, 5):
|
||||
|
||||
@ -2,13 +2,13 @@ from logging import getLogger
|
||||
from types import GeneratorType
|
||||
|
||||
from colorama import Back, Fore, Style
|
||||
from django.core.management import BaseCommand
|
||||
from django.core.management.base import OutputWrapper
|
||||
from mondrian import term
|
||||
|
||||
import bonobo
|
||||
from bonobo.plugins.console import ConsoleOutputPlugin
|
||||
from bonobo.util.term import CLEAR_EOL
|
||||
from django.core.management import BaseCommand
|
||||
from django.core.management.base import OutputWrapper
|
||||
|
||||
from .utils import create_or_update
|
||||
|
||||
|
||||
@ -7,9 +7,7 @@ at home if you want to give it a shot.
|
||||
|
||||
"""
|
||||
from bonobo.execution.strategies.executor import (
|
||||
AsyncThreadPoolExecutorStrategy,
|
||||
ProcessPoolExecutorStrategy,
|
||||
ThreadPoolExecutorStrategy,
|
||||
AsyncThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy, ThreadPoolExecutorStrategy
|
||||
)
|
||||
from bonobo.execution.strategies.naive import NaiveStrategy
|
||||
|
||||
|
||||
@ -59,13 +59,14 @@ class Graph:
|
||||
self.edges = {BEGIN: set()}
|
||||
self.named = {}
|
||||
self.nodes = []
|
||||
self.add_chain(*chain)
|
||||
if len(chain):
|
||||
self.add_chain(*chain)
|
||||
|
||||
def __iter__(self):
|
||||
yield from self.nodes
|
||||
|
||||
def __len__(self):
|
||||
""" Node count.
|
||||
"""Node count.
|
||||
"""
|
||||
return len(self.nodes)
|
||||
|
||||
@ -73,51 +74,112 @@ class Graph:
|
||||
return self.nodes[key]
|
||||
|
||||
def get_cursor(self, ref=BEGIN):
|
||||
return GraphCursor(self, last=self._resolve_index(ref))
|
||||
return GraphCursor(self, last=self.index_of(ref))
|
||||
|
||||
def outputs_of(self, idx, create=False):
|
||||
""" Get a set of the outputs for a given node index.
|
||||
def index_of(self, mixed):
|
||||
"""
|
||||
if create and not idx in self.edges:
|
||||
self.edges[idx] = set()
|
||||
return self.edges[idx]
|
||||
Find the index based on various strategies for a node, probably an input or output of chain. Supported
|
||||
inputs are indexes, node values or names.
|
||||
|
||||
def add_node(self, c):
|
||||
""" Add a node without connections in this graph and returns its index.
|
||||
"""
|
||||
if mixed is None:
|
||||
return None
|
||||
|
||||
if type(mixed) is int or mixed in self.edges:
|
||||
return mixed
|
||||
|
||||
if isinstance(mixed, str) and mixed in self.named:
|
||||
return self.named[mixed]
|
||||
|
||||
if mixed in self.nodes:
|
||||
return self.nodes.index(mixed)
|
||||
|
||||
raise ValueError("Cannot find node matching {!r}.".format(mixed))
|
||||
|
||||
def outputs_of(self, idx_or_node, create=False):
|
||||
"""Get a set of the outputs for a given node, node index or name.
|
||||
"""
|
||||
idx_or_node = self.index_of(idx_or_node)
|
||||
|
||||
if create and not idx_or_node in self.edges:
|
||||
self.edges[idx_or_node] = set()
|
||||
return self.edges[idx_or_node]
|
||||
|
||||
def add_node(self, c, *, _name=None):
|
||||
"""Add a node without connections in this graph and returns its index.
|
||||
If _name is specified, name this node (string reference for further usage).
|
||||
"""
|
||||
idx = len(self.nodes)
|
||||
self.edges[idx] = set()
|
||||
self.nodes.append(c)
|
||||
|
||||
if _name:
|
||||
if _name in self.named:
|
||||
raise KeyError("Duplicate name {!r} in graph.".format(_name))
|
||||
self.named[_name] = idx
|
||||
|
||||
return idx
|
||||
|
||||
def add_chain(self, *nodes, _input=BEGIN, _output=None, _name=None):
|
||||
""" Add a chain in this graph.
|
||||
"""Add `nodes` as a chain in this graph.
|
||||
|
||||
**Input rules**
|
||||
|
||||
* By default, this chain will be connected to `BEGIN`, a.k.a the special node that kickstarts transformations.
|
||||
* If `_input` is set to `None`, then this chain won't receive any input unless you connect it manually to
|
||||
something.
|
||||
* If `_input` is something that can resolve to another node using `index_of` rules, then the chain will
|
||||
receive the output stream of referenced node.
|
||||
|
||||
**Output rules**
|
||||
|
||||
* By default, this chain won't send its output anywhere. This is, most of the time, what you want.
|
||||
* If `_output` is set to something (that can resolve to a node), then the last node in the chain will send its
|
||||
outputs to the given node. This means you can provide an object, a name, or an index.
|
||||
|
||||
**Naming**
|
||||
|
||||
* If a `_name` is given, the first node in the chain will be named this way (same effect as providing a `_name`
|
||||
to add_node).
|
||||
|
||||
**Special cases**
|
||||
|
||||
* You can use this method to connect two other chains (in fact, two nodes) by not giving any `nodes`, but
|
||||
still providing values to `_input` and `_output`.
|
||||
|
||||
"""
|
||||
if len(nodes):
|
||||
_input = self._resolve_index(_input)
|
||||
_output = self._resolve_index(_output)
|
||||
_first = None
|
||||
_last = None
|
||||
_input = self.index_of(_input)
|
||||
_output = self.index_of(_output)
|
||||
_first = None
|
||||
_last = None
|
||||
|
||||
for i, node in enumerate(nodes):
|
||||
_last = self.add_node(node)
|
||||
if not i and _name:
|
||||
if _name in self.named:
|
||||
raise KeyError("Duplicate name {!r} in graph.".format(_name))
|
||||
self.named[_name] = _last
|
||||
if _first is None:
|
||||
_first = _last
|
||||
self.outputs_of(_input, create=True).add(_last)
|
||||
_input = _last
|
||||
# Sanity checks.
|
||||
if not len(nodes):
|
||||
if _input is None or _output is None:
|
||||
raise ValueError(
|
||||
"Using add_chain(...) without nodes is only possible if you provide both _input and _output values."
|
||||
)
|
||||
|
||||
if _output is not None:
|
||||
self.outputs_of(_input, create=True).add(_output)
|
||||
if _name is not None:
|
||||
raise RuntimeError("Using add_chain(...) without nodes does not allow to use the _name parameter.")
|
||||
|
||||
if hasattr(self, "_topologcally_sorted_indexes_cache"):
|
||||
del self._topologcally_sorted_indexes_cache
|
||||
for i, node in enumerate(nodes):
|
||||
_last = self.add_node(node, _name=_name if not i else None)
|
||||
|
||||
return GraphRange(self, _first, _last)
|
||||
return GraphRange(self, None, None)
|
||||
if _first is None:
|
||||
_first = _last
|
||||
|
||||
self.outputs_of(_input, create=True).add(_last)
|
||||
|
||||
_input = _last
|
||||
|
||||
if _output is not None:
|
||||
self.outputs_of(_input, create=True).add(_output)
|
||||
|
||||
if hasattr(self, "_topologcally_sorted_indexes_cache"):
|
||||
del self._topologcally_sorted_indexes_cache
|
||||
|
||||
return GraphRange(self, _first, _last)
|
||||
|
||||
def copy(self):
|
||||
g = Graph()
|
||||
@ -191,26 +253,6 @@ class Graph:
|
||||
except (ExecutableNotFound, FileNotFoundError) as exc:
|
||||
return "<strong>{}</strong>: {}".format(type(exc).__name__, str(exc))
|
||||
|
||||
def _resolve_index(self, mixed):
|
||||
"""
|
||||
Find the index based on various strategies for a node, probably an input or output of chain. Supported
|
||||
inputs are indexes, node values or names.
|
||||
|
||||
"""
|
||||
if mixed is None:
|
||||
return None
|
||||
|
||||
if type(mixed) is int or mixed in self.edges:
|
||||
return mixed
|
||||
|
||||
if isinstance(mixed, str) and mixed in self.named:
|
||||
return self.named[mixed]
|
||||
|
||||
if mixed in self.nodes:
|
||||
return self.nodes.index(mixed)
|
||||
|
||||
raise ValueError("Cannot find node matching {!r}.".format(mixed))
|
||||
|
||||
|
||||
def _get_graphviz_node_id(graph, i):
|
||||
escaped_index = str(i)
|
||||
|
||||
@ -6,15 +6,7 @@ and inspect transformations, graphs, and nodes.
|
||||
from bonobo.util.collections import cast, ensure_tuple, sortedlist, tuplize
|
||||
from bonobo.util.compat import deprecated, deprecated_alias
|
||||
from bonobo.util.inspect import (
|
||||
inspect_node,
|
||||
isconfigurable,
|
||||
isconfigurabletype,
|
||||
iscontextprocessor,
|
||||
isdict,
|
||||
ismethod,
|
||||
isoption,
|
||||
istuple,
|
||||
istype,
|
||||
inspect_node, isconfigurable, isconfigurabletype, iscontextprocessor, isdict, ismethod, isoption, istuple, istype
|
||||
)
|
||||
from bonobo.util.objects import ValueHolder, get_attribute_or_create, get_name
|
||||
|
||||
|
||||
@ -213,10 +213,9 @@ Named nodes
|
||||
|
||||
Using above code to create convergences often leads to code which is hard to read, because you have to define the "target" stream
|
||||
before the streams that logically goes to the beginning of the transformation graph. To overcome that, one can use
|
||||
"named" nodes:
|
||||
"named" nodes.
|
||||
|
||||
graph.add_chain(x, y, z, _name='zed')
|
||||
graph.add_chain(f, g, h, _input='zed')
|
||||
Please note that naming a chain is exactly the same thing as naming the first node of a chain.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
@ -224,13 +223,12 @@ before the streams that logically goes to the beginning of the transformation gr
|
||||
|
||||
graph = bonobo.Graph()
|
||||
|
||||
# Add two different chains
|
||||
graph.add_chain(a, b, _output="load")
|
||||
graph.add_chain(f, g, _output="load")
|
||||
|
||||
# Here we mark _input to None, so normalize won't get the "begin" impulsion.
|
||||
graph.add_chain(normalize, store, _input=None, _name="load")
|
||||
|
||||
# Add two different chains that will output to the "load" node
|
||||
graph.add_chain(a, b, _output="load")
|
||||
graph.add_chain(f, g, _output="load")
|
||||
|
||||
Resulting graph:
|
||||
|
||||
@ -249,6 +247,43 @@ Resulting graph:
|
||||
"normalize (load)" -> "store"
|
||||
}
|
||||
|
||||
You can also create single nodes, and the api provide the same capability on single nodes.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import bonobo
|
||||
|
||||
graph = bonobo.Graph()
|
||||
|
||||
# Create a node without any connection, name it.
|
||||
graph.add_node(foo, _name="foo")
|
||||
|
||||
# Use it somewhere else as the data source.
|
||||
graph.add_chain(..., _input="foo")
|
||||
|
||||
# ... or as the data sink.
|
||||
graph.add_chain(..., _output="foo")
|
||||
|
||||
|
||||
Connecting two nodes
|
||||
::::::::::::::::::::
|
||||
|
||||
You may want to connect two nodes at some point. You can use `add_chain` without nodes to achieve it.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import bonobo
|
||||
|
||||
graph = bonobo.Graph()
|
||||
|
||||
# Create two "anonymous" nodes
|
||||
graph.add_node(a)
|
||||
graph.add_node(b)
|
||||
|
||||
# Connect them
|
||||
graph.add_chain(_input=a, _output=b)
|
||||
|
||||
|
||||
|
||||
Inspecting graphs
|
||||
:::::::::::::::::
|
||||
|
||||
@ -23,6 +23,27 @@ def test_graph_outputs_of():
|
||||
assert len(g.outputs_of(0)) == 0
|
||||
|
||||
|
||||
def test_graph_index_of():
|
||||
g = Graph()
|
||||
|
||||
g.add_node(sentinel.foo)
|
||||
g.add_node(sentinel.bar)
|
||||
|
||||
# sequential, can resolve objects
|
||||
assert g.index_of(sentinel.foo) == 0
|
||||
assert g.index_of(sentinel.bar) == 1
|
||||
|
||||
# calling on an index should return the index
|
||||
assert g.index_of(sentinel.bar) == g.index_of(g.index_of(sentinel.bar))
|
||||
|
||||
# not existing should raise value error
|
||||
with pytest.raises(ValueError):
|
||||
g.index_of(sentinel.not_there)
|
||||
|
||||
# tokens resolve to themselves
|
||||
assert g.index_of(BEGIN) == BEGIN
|
||||
|
||||
|
||||
def test_graph_add_component():
|
||||
g = Graph()
|
||||
|
||||
@ -35,6 +56,19 @@ def test_graph_add_component():
|
||||
assert len(g.nodes) == 2
|
||||
|
||||
|
||||
def test_invalid_graph_usage():
|
||||
g = Graph()
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
g.add_chain()
|
||||
|
||||
g.add_node(sentinel.foo)
|
||||
g.add_node(sentinel.bar)
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
g.add_chain(_input=sentinel.bar, _output=sentinel.foo, _name="this_is_not_possible")
|
||||
|
||||
|
||||
def test_graph_add_chain():
|
||||
g = Graph()
|
||||
|
||||
@ -63,6 +97,41 @@ def test_graph_topological_sort():
|
||||
assert g[4] == sentinel.b2
|
||||
|
||||
|
||||
def test_connect_two_chains():
|
||||
g = Graph()
|
||||
|
||||
g.add_chain(sentinel.a1, sentinel.a2, _input=None, _output=None)
|
||||
g.add_chain(sentinel.b1, sentinel.b2, _input=None, _output=None)
|
||||
assert len(g.outputs_of(sentinel.a2)) == 0
|
||||
|
||||
g.add_chain(_input=sentinel.a2, _output=sentinel.b1)
|
||||
assert g.outputs_of(sentinel.a2) == {g.index_of(sentinel.b1)}
|
||||
|
||||
|
||||
def test_connect_two_anonymous_nodes():
|
||||
g = Graph()
|
||||
|
||||
# Create two "anonymous" nodes
|
||||
g.add_node(sentinel.a)
|
||||
g.add_node(sentinel.b)
|
||||
|
||||
# Connect them
|
||||
g.add_chain(_input=sentinel.a, _output=sentinel.b)
|
||||
|
||||
|
||||
def test_named_nodes():
|
||||
g = Graph()
|
||||
|
||||
a, b, c, d, e, f = sentinel.a, sentinel.b, sentinel.c, sentinel.d, sentinel.e, sentinel.f
|
||||
|
||||
# Here we mark _input to None, so normalize won't get the "begin" impulsion.
|
||||
g.add_chain(e, f, _input=None, _name="load")
|
||||
|
||||
# Add two different chains
|
||||
g.add_chain(a, b, _output="load")
|
||||
g.add_chain(c, d, _output="load")
|
||||
|
||||
|
||||
def test_copy():
|
||||
g1 = Graph()
|
||||
g2 = g1.copy()
|
||||
|
||||
Reference in New Issue
Block a user