work in progress ...

This commit is contained in:
Romain Dorgueil
2016-12-09 08:01:04 +01:00
parent 854ef4e2bf
commit 90d3b6235b
24 changed files with 822 additions and 85 deletions

View File

@ -1 +0,0 @@
from __future__ import absolute_import, print_function, unicode_literals

0
bonobo/core/__init__.py Normal file
View File

50
bonobo/core/errors.py Normal file
View File

@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
#
# Copyright 2012-2014 Romain Dorgueil
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
class AbstractError(NotImplementedError):
"""Abstract error is a convenient error to declare a method as "being left as an exercise for the reader"."""
def __init__(self, method):
super().__init__(
'Call to abstract method {class_name}.{method_name}(...): missing implementation.'.format(
class_name=method.__self__.__name__,
method_name=method.__name__,
))
class InactiveIOError(IOError):
pass
class InactiveReadableError(InactiveIOError):
pass
class InactiveWritableError(InactiveIOError):
pass
class ValidationError(RuntimeError):
def __init__(self, inst, message):
super(ValidationError, self).__init__('Validation error in {class_name}: {message}'.format(
class_name=type(inst).__name__,
message=message,
))
class ProhibitedOperationError(RuntimeError):
pass

27
bonobo/core/graph.py Normal file
View File

@ -0,0 +1,27 @@
from bonobo.core.tokens import BEGIN
class Graph:
"""
Represents a coherent directed acyclic graph (DAG) of components.
"""
def __init__(self):
self.components = []
self.graph = {BEGIN: set()}
def outputs_of(self, idx, create=False):
if create and not idx in self.graph:
self.graph[idx] = set()
return self.graph[idx]
def add_component(self, c):
i = len(self.components)
self.components.append(c)
return i
def add_chain(self, *components, input=BEGIN):
for component in components:
next = self.add_component(component)
self.outputs_of(input, create=True).add(next)
input = next

93
bonobo/core/io.py Normal file
View File

@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
#
# Copyright 2012-2014 Romain Dorgueil
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta, abstractmethod
from queue import Queue
from bonobo.core.errors import AbstractError, InactiveWritableError, InactiveReadableError
from bonobo.core.tokens import BEGIN, END
BUFFER_SIZE = 8192
class Readable(metaclass=ABCMeta):
"""Interface for things you can read from."""
@abstractmethod
def get(self, block=True, timeout=None):
"""Read. Block/timeout are there for Queue compat."""
raise AbstractError(self.get)
class Writable(metaclass=ABCMeta):
"""Interface for things you can write to."""
@abstractmethod
def put(self, data, block=True, timeout=None):
"""Write. Block/timeout are there for Queue compat."""
raise AbstractError(self.put)
class Input(Queue, Readable, Writable):
def __init__(self, maxsize=BUFFER_SIZE):
Queue.__init__(self, maxsize)
self._runlevel = 0
self._writable_runlevel = 0
def put(self, data, block=True, timeout=None):
# Begin token is a metadata to raise the input runlevel.
if data == BEGIN:
self._runlevel += 1
self._writable_runlevel += 1
return
# Check we are actually able to receive data.
if self._writable_runlevel < 1:
raise InactiveWritableError('Cannot put() on an inactive {}.'.format(Writable.__name__))
if data == END:
self._writable_runlevel -= 1
return Queue.put(self, data, block, timeout)
def get(self, block=True, timeout=None):
if not self.alive:
raise InactiveReadableError('Cannot get() on an inactive {}.'.format(Readable.__name__))
data = Queue.get(self, block, timeout)
if data == END:
self._runlevel -= 1
if not self.alive:
raise InactiveReadableError(
'Cannot get() on an inactive {} (runlevel just reached 0).'.format(Readable.__name__))
return self.get(block, timeout)
return data
def empty(self):
self.mutex.acquire()
while self._qsize() and self.queue[0] == END:
self._runlevel -= 1
Queue._get(self)
self.mutex.release()
return Queue.empty(self)
@property
def alive(self):
return self._runlevel > 0

30
bonobo/core/stats.py Normal file
View File

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
#
# copyright 2012-2014 romain dorgueil
#
# licensed under the apache license, version 2.0 (the "license");
# you may not use this file except in compliance with the license.
# you may obtain a copy of the license at
#
# http://www.apache.org/licenses/license-2.0
#
# unless required by applicable law or agreed to in writing, software
# distributed under the license is distributed on an "as is" basis,
# without warranties or conditions of any kind, either express or implied.
# see the license for the specific language governing permissions and
# limitations under the license.
from abc import ABCMeta, abstractmethod
from bonobo.core.errors import AbstractError
class WithStatistics(metaclass=ABCMeta):
@abstractmethod
def get_stats(self, *args, **kwargs):
raise AbstractError(self.get_stats)
def get_stats_as_string(self, *args, **kwargs):
return ' '.join(
('{0}={1}'.format(name, cnt) for name, cnt in self.get_stats(*args, **kwargs) if cnt > 0)
)

77
bonobo/core/strategy.py Normal file
View File

@ -0,0 +1,77 @@
import time
from concurrent.futures import Executor
from queue import Queue, Empty
from bonobo.core.io import Input
from bonobo.core.tokens import BEGIN
from bonobo.util.iterators import force_iterator
class Strategy:
def execute(self, graph, *args, **kwargs):
raise NotImplementedError
class NaiveStrategy(Strategy):
def execute(self, graph, *args, **kwargs):
input_queues = {i: Queue() for i in range(len(graph.components))}
for i, component in enumerate(graph.components):
while True:
try:
args = (input_queues[i].get(block=False),) if i else ()
for row in force_iterator(component(*args)):
input_queues[i + 1].put(row)
if not i:
raise Empty
except Empty:
break
class ExecutionContext:
def __init__(self, graph):
self.graph = graph
class ExecutorStrategy(Strategy):
context_type = ExecutionContext
executor_type = Executor
def __init__(self, executor=None):
self.executor = executor or self.executor_type()
def create_context(self, graph, *args, **kwargs):
return self.context_type(graph)
def execute(self, graph, *args, **kwargs):
context = self.create_context(graph)
for i in graph.outputs_of(BEGIN):
self.call_component(i, *args, **kwargs)
while len(self.running):
# print(self.running)
time.sleep(0.1)
f = self.executor.submit(self.components[idx], *args, **kwargs)
self.running.add(f)
@f.add_done_callback
def on_component_done(f):
nonlocal self, idx
outputs = self.outputs_of(idx)
results = force_iterator(f.result())
if results:
for result in results:
for output in outputs:
self.call_component(output, result)
self.running.remove(f)
def __run_component(self, component):
c_in = Input()
while c_in.alive:
row = c_in.get()
component(row)

12
bonobo/core/tokens.py Normal file
View File

@ -0,0 +1,12 @@
class Token:
"""Factory for signal oriented queue messages or other token types."""
def __init__(self, name):
self.__name__ = name
def __repr__(self):
return '<{}>'.format(self.__name__)
BEGIN = Token('Begin')
END = Token('End')

0
bonobo/util/__init__.py Normal file
View File

7
bonobo/util/iterators.py Normal file
View File

@ -0,0 +1,7 @@
def force_iterator(x):
if isinstance(x, str):
return [x]
try:
return iter(x)
except Exception as e:
return [x] if x else []