[examples] Fix examples, fix termination bug with unrecoverable errors.

This commit is contained in:
Romain Dorgueil
2017-07-05 13:08:53 +02:00
parent 9801c75720
commit 8de6f50523
6 changed files with 25 additions and 14 deletions

View File

@ -73,15 +73,15 @@ def display(row):
print(
' - {}address{}: {address}'.
format(Fore.BLUE, Style.RESET_ALL, address=', '.join(address))
format(Fore.BLUE, Style.RESET_ALL, address=', '.join(address))
)
print(
' - {}links{}: {links}'.
format(Fore.BLUE, Style.RESET_ALL, links=', '.join(row['links']))
format(Fore.BLUE, Style.RESET_ALL, links=', '.join(row['links']))
)
print(
' - {}geometry{}: {geometry}'.
format(Fore.BLUE, Style.RESET_ALL, **row)
format(Fore.BLUE, Style.RESET_ALL, **row)
)
print(
' - {}source{}: {source}'.format(
@ -96,8 +96,8 @@ graph = bonobo.Graph(
),
normalize,
filter_france,
bonobo.JsonWriter(path='fablabs.txt', ioformat='arg0'),
bonobo.Tee(display),
bonobo.JsonWriter(path='fablabs.txt'),
)
if __name__ == '__main__':

View File

@ -9,13 +9,16 @@ class OddOnlyFilter(Filter):
@Filter
def MultiplesOfThreeOnlyFilter(self, i):
def multiples_of_three(i):
return not (i % 3)
graph = bonobo.Graph(
lambda: tuple(range(50)),
OddOnlyFilter(),
MultiplesOfThreeOnlyFilter(),
multiples_of_three,
print,
)
if __name__ == '__main__':
bonobo.run(graph)

View File

@ -14,3 +14,6 @@ graph = bonobo.Graph(
pause,
print,
)
if __name__ == '__main__':
bonobo.run(graph)

View File

@ -8,7 +8,7 @@ def split_one(line):
graph = bonobo.Graph(
bonobo.FileReader('coffeeshops.txt'),
split_one,
bonobo.JsonWriter('coffeeshops.json'),
bonobo.JsonWriter('coffeeshops.json', ioformat='arg0'),
)
if __name__ == '__main__':

View File

@ -95,6 +95,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
continue
except UnrecoverableError as exc:
self.handle_error(exc, traceback.format_exc())
self.input.shutdown()
break
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())

View File

@ -77,6 +77,12 @@ class Input(Queue, Readable, Writable):
return Queue.put(self, data, block, timeout)
def _decrement_runlevel(self):
if self._runlevel == 1:
self.on_finalize()
self._runlevel -= 1
self.on_end()
def get(self, block=True, timeout=None):
if not self.alive:
raise InactiveReadableError('Cannot get() on an inactive {}.'.format(Readable.__name__))
@ -84,13 +90,7 @@ class Input(Queue, Readable, Writable):
data = Queue.get(self, block, timeout)
if data == END:
if self._runlevel == 1:
self.on_finalize()
self._runlevel -= 1
# callback
self.on_end()
self._decrement_runlevel()
if not self.alive:
raise InactiveReadableError(
@ -100,6 +100,10 @@ class Input(Queue, Readable, Writable):
return data
def shutdown(self):
while self._runlevel >= 1:
self._decrement_runlevel()
def empty(self):
self.mutex.acquire()
while self._qsize() and self.queue[0] == END: