diff --git a/bonobo/__init__.py b/bonobo/__init__.py index 181e64a..076a7f1 100644 --- a/bonobo/__init__.py +++ b/bonobo/__init__.py @@ -8,50 +8,13 @@ import sys from pathlib import Path -from bonobo._api import ( - CsvReader, - CsvWriter, - FileReader, - FileWriter, - Filter, - FixedWindow, - Format, - Graph, - JsonReader, - JsonWriter, - LdjsonReader, - LdjsonWriter, - Limit, - MapFields, - OrderFields, - PickleReader, - PickleWriter, - PrettyPrinter, - RateLimited, - Rename, - SetFields, - Tee, - UnpackItems, - __all__, - __doc__, - count, - create_reader, - create_strategy, - create_writer, - get_argument_parser, - get_examples_path, - identity, - inspect, - noop, - open_examples_fs, - open_fs, - parse_args, - run, -) -from bonobo._version import __version__ - from bonobo._api import * -from bonobo._api import __all__, __doc__ +from bonobo._api import ( + CsvReader, CsvWriter, FileReader, FileWriter, Filter, FixedWindow, Format, Graph, JsonReader, JsonWriter, + LdjsonReader, LdjsonWriter, Limit, MapFields, OrderFields, PickleReader, PickleWriter, PrettyPrinter, RateLimited, + Rename, SetFields, Tee, UnpackItems, __all__, __doc__, count, create_reader, create_strategy, create_writer, + get_argument_parser, get_examples_path, identity, inspect, noop, open_examples_fs, open_fs, parse_args, run +) from bonobo._version import __version__ if sys.version_info < (3, 5): diff --git a/bonobo/contrib/django/commands.py b/bonobo/contrib/django/commands.py index 4974540..e96eac4 100644 --- a/bonobo/contrib/django/commands.py +++ b/bonobo/contrib/django/commands.py @@ -2,13 +2,13 @@ from logging import getLogger from types import GeneratorType from colorama import Back, Fore, Style -from django.core.management import BaseCommand -from django.core.management.base import OutputWrapper from mondrian import term import bonobo from bonobo.plugins.console import ConsoleOutputPlugin from bonobo.util.term import CLEAR_EOL +from django.core.management import BaseCommand +from django.core.management.base import OutputWrapper from .utils import create_or_update diff --git a/bonobo/execution/strategies/__init__.py b/bonobo/execution/strategies/__init__.py index fc802ad..5995bf0 100644 --- a/bonobo/execution/strategies/__init__.py +++ b/bonobo/execution/strategies/__init__.py @@ -7,9 +7,7 @@ at home if you want to give it a shot. """ from bonobo.execution.strategies.executor import ( - AsyncThreadPoolExecutorStrategy, - ProcessPoolExecutorStrategy, - ThreadPoolExecutorStrategy, + AsyncThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy, ThreadPoolExecutorStrategy ) from bonobo.execution.strategies.naive import NaiveStrategy diff --git a/bonobo/structs/graphs.py b/bonobo/structs/graphs.py index 10f88f4..8d256de 100644 --- a/bonobo/structs/graphs.py +++ b/bonobo/structs/graphs.py @@ -59,13 +59,14 @@ class Graph: self.edges = {BEGIN: set()} self.named = {} self.nodes = [] - self.add_chain(*chain) + if len(chain): + self.add_chain(*chain) def __iter__(self): yield from self.nodes def __len__(self): - """ Node count. + """Node count. """ return len(self.nodes) @@ -73,51 +74,112 @@ class Graph: return self.nodes[key] def get_cursor(self, ref=BEGIN): - return GraphCursor(self, last=self._resolve_index(ref)) + return GraphCursor(self, last=self.index_of(ref)) - def outputs_of(self, idx, create=False): - """ Get a set of the outputs for a given node index. + def index_of(self, mixed): """ - if create and not idx in self.edges: - self.edges[idx] = set() - return self.edges[idx] + Find the index based on various strategies for a node, probably an input or output of chain. Supported + inputs are indexes, node values or names. - def add_node(self, c): - """ Add a node without connections in this graph and returns its index. + """ + if mixed is None: + return None + + if type(mixed) is int or mixed in self.edges: + return mixed + + if isinstance(mixed, str) and mixed in self.named: + return self.named[mixed] + + if mixed in self.nodes: + return self.nodes.index(mixed) + + raise ValueError("Cannot find node matching {!r}.".format(mixed)) + + def outputs_of(self, idx_or_node, create=False): + """Get a set of the outputs for a given node, node index or name. + """ + idx_or_node = self.index_of(idx_or_node) + + if create and not idx_or_node in self.edges: + self.edges[idx_or_node] = set() + return self.edges[idx_or_node] + + def add_node(self, c, *, _name=None): + """Add a node without connections in this graph and returns its index. + If _name is specified, name this node (string reference for further usage). """ idx = len(self.nodes) self.edges[idx] = set() self.nodes.append(c) + + if _name: + if _name in self.named: + raise KeyError("Duplicate name {!r} in graph.".format(_name)) + self.named[_name] = idx + return idx def add_chain(self, *nodes, _input=BEGIN, _output=None, _name=None): - """ Add a chain in this graph. + """Add `nodes` as a chain in this graph. + + **Input rules** + + * By default, this chain will be connected to `BEGIN`, a.k.a the special node that kickstarts transformations. + * If `_input` is set to `None`, then this chain won't receive any input unless you connect it manually to + something. + * If `_input` is something that can resolve to another node using `index_of` rules, then the chain will + receive the output stream of referenced node. + + **Output rules** + + * By default, this chain won't send its output anywhere. This is, most of the time, what you want. + * If `_output` is set to something (that can resolve to a node), then the last node in the chain will send its + outputs to the given node. This means you can provide an object, a name, or an index. + + **Naming** + + * If a `_name` is given, the first node in the chain will be named this way (same effect as providing a `_name` + to add_node). + + **Special cases** + + * You can use this method to connect two other chains (in fact, two nodes) by not giving any `nodes`, but + still providing values to `_input` and `_output`. + """ - if len(nodes): - _input = self._resolve_index(_input) - _output = self._resolve_index(_output) - _first = None - _last = None + _input = self.index_of(_input) + _output = self.index_of(_output) + _first = None + _last = None - for i, node in enumerate(nodes): - _last = self.add_node(node) - if not i and _name: - if _name in self.named: - raise KeyError("Duplicate name {!r} in graph.".format(_name)) - self.named[_name] = _last - if _first is None: - _first = _last - self.outputs_of(_input, create=True).add(_last) - _input = _last + # Sanity checks. + if not len(nodes): + if _input is None or _output is None: + raise ValueError( + "Using add_chain(...) without nodes is only possible if you provide both _input and _output values." + ) - if _output is not None: - self.outputs_of(_input, create=True).add(_output) + if _name is not None: + raise RuntimeError("Using add_chain(...) without nodes does not allow to use the _name parameter.") - if hasattr(self, "_topologcally_sorted_indexes_cache"): - del self._topologcally_sorted_indexes_cache + for i, node in enumerate(nodes): + _last = self.add_node(node, _name=_name if not i else None) - return GraphRange(self, _first, _last) - return GraphRange(self, None, None) + if _first is None: + _first = _last + + self.outputs_of(_input, create=True).add(_last) + + _input = _last + + if _output is not None: + self.outputs_of(_input, create=True).add(_output) + + if hasattr(self, "_topologcally_sorted_indexes_cache"): + del self._topologcally_sorted_indexes_cache + + return GraphRange(self, _first, _last) def copy(self): g = Graph() @@ -191,26 +253,6 @@ class Graph: except (ExecutableNotFound, FileNotFoundError) as exc: return "{}: {}".format(type(exc).__name__, str(exc)) - def _resolve_index(self, mixed): - """ - Find the index based on various strategies for a node, probably an input or output of chain. Supported - inputs are indexes, node values or names. - - """ - if mixed is None: - return None - - if type(mixed) is int or mixed in self.edges: - return mixed - - if isinstance(mixed, str) and mixed in self.named: - return self.named[mixed] - - if mixed in self.nodes: - return self.nodes.index(mixed) - - raise ValueError("Cannot find node matching {!r}.".format(mixed)) - def _get_graphviz_node_id(graph, i): escaped_index = str(i) diff --git a/bonobo/util/__init__.py b/bonobo/util/__init__.py index b4be454..c3c33e2 100644 --- a/bonobo/util/__init__.py +++ b/bonobo/util/__init__.py @@ -6,15 +6,7 @@ and inspect transformations, graphs, and nodes. from bonobo.util.collections import cast, ensure_tuple, sortedlist, tuplize from bonobo.util.compat import deprecated, deprecated_alias from bonobo.util.inspect import ( - inspect_node, - isconfigurable, - isconfigurabletype, - iscontextprocessor, - isdict, - ismethod, - isoption, - istuple, - istype, + inspect_node, isconfigurable, isconfigurabletype, iscontextprocessor, isdict, ismethod, isoption, istuple, istype ) from bonobo.util.objects import ValueHolder, get_attribute_or_create, get_name diff --git a/docs/guide/graphs.rst b/docs/guide/graphs.rst index bdfc502..bad66f1 100644 --- a/docs/guide/graphs.rst +++ b/docs/guide/graphs.rst @@ -213,10 +213,9 @@ Named nodes Using above code to create convergences often leads to code which is hard to read, because you have to define the "target" stream before the streams that logically goes to the beginning of the transformation graph. To overcome that, one can use -"named" nodes: +"named" nodes. - graph.add_chain(x, y, z, _name='zed') - graph.add_chain(f, g, h, _input='zed') +Please note that naming a chain is exactly the same thing as naming the first node of a chain. .. code-block:: python @@ -224,13 +223,12 @@ before the streams that logically goes to the beginning of the transformation gr graph = bonobo.Graph() - # Add two different chains - graph.add_chain(a, b, _output="load") - graph.add_chain(f, g, _output="load") - # Here we mark _input to None, so normalize won't get the "begin" impulsion. graph.add_chain(normalize, store, _input=None, _name="load") + # Add two different chains that will output to the "load" node + graph.add_chain(a, b, _output="load") + graph.add_chain(f, g, _output="load") Resulting graph: @@ -249,6 +247,43 @@ Resulting graph: "normalize (load)" -> "store" } +You can also create single nodes, and the api provide the same capability on single nodes. + +.. code-block:: python + + import bonobo + + graph = bonobo.Graph() + + # Create a node without any connection, name it. + graph.add_node(foo, _name="foo") + + # Use it somewhere else as the data source. + graph.add_chain(..., _input="foo") + + # ... or as the data sink. + graph.add_chain(..., _output="foo") + + +Connecting two nodes +:::::::::::::::::::: + +You may want to connect two nodes at some point. You can use `add_chain` without nodes to achieve it. + +.. code-block:: python + + import bonobo + + graph = bonobo.Graph() + + # Create two "anonymous" nodes + graph.add_node(a) + graph.add_node(b) + + # Connect them + graph.add_chain(_input=a, _output=b) + + Inspecting graphs ::::::::::::::::: diff --git a/tests/structs/test_graphs.py b/tests/structs/test_graphs.py index fbca00c..725ba61 100644 --- a/tests/structs/test_graphs.py +++ b/tests/structs/test_graphs.py @@ -23,6 +23,27 @@ def test_graph_outputs_of(): assert len(g.outputs_of(0)) == 0 +def test_graph_index_of(): + g = Graph() + + g.add_node(sentinel.foo) + g.add_node(sentinel.bar) + + # sequential, can resolve objects + assert g.index_of(sentinel.foo) == 0 + assert g.index_of(sentinel.bar) == 1 + + # calling on an index should return the index + assert g.index_of(sentinel.bar) == g.index_of(g.index_of(sentinel.bar)) + + # not existing should raise value error + with pytest.raises(ValueError): + g.index_of(sentinel.not_there) + + # tokens resolve to themselves + assert g.index_of(BEGIN) == BEGIN + + def test_graph_add_component(): g = Graph() @@ -35,6 +56,19 @@ def test_graph_add_component(): assert len(g.nodes) == 2 +def test_invalid_graph_usage(): + g = Graph() + + with pytest.raises(ValueError): + g.add_chain() + + g.add_node(sentinel.foo) + g.add_node(sentinel.bar) + + with pytest.raises(RuntimeError): + g.add_chain(_input=sentinel.bar, _output=sentinel.foo, _name="this_is_not_possible") + + def test_graph_add_chain(): g = Graph() @@ -63,6 +97,41 @@ def test_graph_topological_sort(): assert g[4] == sentinel.b2 +def test_connect_two_chains(): + g = Graph() + + g.add_chain(sentinel.a1, sentinel.a2, _input=None, _output=None) + g.add_chain(sentinel.b1, sentinel.b2, _input=None, _output=None) + assert len(g.outputs_of(sentinel.a2)) == 0 + + g.add_chain(_input=sentinel.a2, _output=sentinel.b1) + assert g.outputs_of(sentinel.a2) == {g.index_of(sentinel.b1)} + + +def test_connect_two_anonymous_nodes(): + g = Graph() + + # Create two "anonymous" nodes + g.add_node(sentinel.a) + g.add_node(sentinel.b) + + # Connect them + g.add_chain(_input=sentinel.a, _output=sentinel.b) + + +def test_named_nodes(): + g = Graph() + + a, b, c, d, e, f = sentinel.a, sentinel.b, sentinel.c, sentinel.d, sentinel.e, sentinel.f + + # Here we mark _input to None, so normalize won't get the "begin" impulsion. + g.add_chain(e, f, _input=None, _name="load") + + # Add two different chains + g.add_chain(a, b, _output="load") + g.add_chain(c, d, _output="load") + + def test_copy(): g1 = Graph() g2 = g1.copy()