From 8de6f50523d74ff08d81a0764d19259393c705ae Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Wed, 5 Jul 2017 13:08:53 +0200 Subject: [PATCH] [examples] Fix examples, fix termination bug with unrecoverable errors. --- bonobo/examples/datasets/fablabs.py | 8 ++++---- bonobo/examples/nodes/filter.py | 7 +++++-- bonobo/examples/nodes/slow.py | 3 +++ bonobo/examples/tutorials/tut02e02_write.py | 2 +- bonobo/execution/node.py | 1 + bonobo/structs/inputs.py | 18 +++++++++++------- 6 files changed, 25 insertions(+), 14 deletions(-) diff --git a/bonobo/examples/datasets/fablabs.py b/bonobo/examples/datasets/fablabs.py index be95fe1..33ed91c 100644 --- a/bonobo/examples/datasets/fablabs.py +++ b/bonobo/examples/datasets/fablabs.py @@ -73,15 +73,15 @@ def display(row): print( ' - {}address{}: {address}'. - format(Fore.BLUE, Style.RESET_ALL, address=', '.join(address)) + format(Fore.BLUE, Style.RESET_ALL, address=', '.join(address)) ) print( ' - {}links{}: {links}'. - format(Fore.BLUE, Style.RESET_ALL, links=', '.join(row['links'])) + format(Fore.BLUE, Style.RESET_ALL, links=', '.join(row['links'])) ) print( ' - {}geometry{}: {geometry}'. - format(Fore.BLUE, Style.RESET_ALL, **row) + format(Fore.BLUE, Style.RESET_ALL, **row) ) print( ' - {}source{}: {source}'.format( @@ -96,8 +96,8 @@ graph = bonobo.Graph( ), normalize, filter_france, + bonobo.JsonWriter(path='fablabs.txt', ioformat='arg0'), bonobo.Tee(display), - bonobo.JsonWriter(path='fablabs.txt'), ) if __name__ == '__main__': diff --git a/bonobo/examples/nodes/filter.py b/bonobo/examples/nodes/filter.py index bf390e9..4f7219a 100644 --- a/bonobo/examples/nodes/filter.py +++ b/bonobo/examples/nodes/filter.py @@ -9,13 +9,16 @@ class OddOnlyFilter(Filter): @Filter -def MultiplesOfThreeOnlyFilter(self, i): +def multiples_of_three(i): return not (i % 3) graph = bonobo.Graph( lambda: tuple(range(50)), OddOnlyFilter(), - MultiplesOfThreeOnlyFilter(), + multiples_of_three, print, ) + +if __name__ == '__main__': + bonobo.run(graph) diff --git a/bonobo/examples/nodes/slow.py b/bonobo/examples/nodes/slow.py index b9623af..ecaaf44 100644 --- a/bonobo/examples/nodes/slow.py +++ b/bonobo/examples/nodes/slow.py @@ -14,3 +14,6 @@ graph = bonobo.Graph( pause, print, ) + +if __name__ == '__main__': + bonobo.run(graph) diff --git a/bonobo/examples/tutorials/tut02e02_write.py b/bonobo/examples/tutorials/tut02e02_write.py index 1d41ac2..664bca6 100644 --- a/bonobo/examples/tutorials/tut02e02_write.py +++ b/bonobo/examples/tutorials/tut02e02_write.py @@ -8,7 +8,7 @@ def split_one(line): graph = bonobo.Graph( bonobo.FileReader('coffeeshops.txt'), split_one, - bonobo.JsonWriter('coffeeshops.json'), + bonobo.JsonWriter('coffeeshops.json', ioformat='arg0'), ) if __name__ == '__main__': diff --git a/bonobo/execution/node.py b/bonobo/execution/node.py index 4edb75e..45691a6 100644 --- a/bonobo/execution/node.py +++ b/bonobo/execution/node.py @@ -95,6 +95,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): continue except UnrecoverableError as exc: self.handle_error(exc, traceback.format_exc()) + self.input.shutdown() break except Exception as exc: # pylint: disable=broad-except self.handle_error(exc, traceback.format_exc()) diff --git a/bonobo/structs/inputs.py b/bonobo/structs/inputs.py index cf9a6ec..7cfe12f 100644 --- a/bonobo/structs/inputs.py +++ b/bonobo/structs/inputs.py @@ -77,6 +77,12 @@ class Input(Queue, Readable, Writable): return Queue.put(self, data, block, timeout) + def _decrement_runlevel(self): + if self._runlevel == 1: + self.on_finalize() + self._runlevel -= 1 + self.on_end() + def get(self, block=True, timeout=None): if not self.alive: raise InactiveReadableError('Cannot get() on an inactive {}.'.format(Readable.__name__)) @@ -84,13 +90,7 @@ class Input(Queue, Readable, Writable): data = Queue.get(self, block, timeout) if data == END: - if self._runlevel == 1: - self.on_finalize() - - self._runlevel -= 1 - - # callback - self.on_end() + self._decrement_runlevel() if not self.alive: raise InactiveReadableError( @@ -100,6 +100,10 @@ class Input(Queue, Readable, Writable): return data + def shutdown(self): + while self._runlevel >= 1: + self._decrement_runlevel() + def empty(self): self.mutex.acquire() while self._qsize() and self.queue[0] == END: