More examples, fix erroneous input stat in execution context.
This commit is contained in:
@ -156,9 +156,13 @@ class ComponentExecutionContext(WithStatistics):
|
|||||||
self.input.put(value)
|
self.input.put(value)
|
||||||
|
|
||||||
def get(self):
|
def get(self):
|
||||||
# todo XXX if timeout, in stat is erroneous
|
"""
|
||||||
|
Get from the queue first, then increment stats, so if Queue raise Timeout or Empty, stat won't be changed.
|
||||||
|
|
||||||
|
"""
|
||||||
|
row = self.input.get(timeout=1)
|
||||||
self.stats['in'] += 1
|
self.stats['in'] += 1
|
||||||
return self.input.get(timeout=1)
|
return row
|
||||||
|
|
||||||
def _call(self, bag):
|
def _call(self, bag):
|
||||||
# todo add timer
|
# todo add timer
|
||||||
|
|||||||
33
examples/basic_extract_transform_load_of_bags.py
Normal file
33
examples/basic_extract_transform_load_of_bags.py
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
import time
|
||||||
|
from random import randint
|
||||||
|
|
||||||
|
from bonobo import Bag
|
||||||
|
from bonobo.core.graphs import Graph
|
||||||
|
from bonobo.core.strategies.executor import ThreadPoolExecutorStrategy
|
||||||
|
from bonobo.ext.console import ConsoleOutputPlugin
|
||||||
|
|
||||||
|
|
||||||
|
def extract():
|
||||||
|
yield Bag(topic='foo')
|
||||||
|
yield Bag(topic='bar')
|
||||||
|
yield Bag(topic='baz')
|
||||||
|
|
||||||
|
|
||||||
|
def transform(topic: str):
|
||||||
|
wait = randint(0, 1)
|
||||||
|
time.sleep(wait)
|
||||||
|
return Bag.inherit(title=topic.title(), wait=wait)
|
||||||
|
|
||||||
|
|
||||||
|
def load(topic: str, title: str, wait: int):
|
||||||
|
print('{} ({}) wait={}'.format(title, topic, wait))
|
||||||
|
|
||||||
|
|
||||||
|
Strategy = ThreadPoolExecutorStrategy
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
etl = Graph()
|
||||||
|
etl.add_chain(extract, transform, load)
|
||||||
|
|
||||||
|
s = Strategy()
|
||||||
|
s.execute(etl, plugins=[ConsoleOutputPlugin()])
|
||||||
@ -0,0 +1,35 @@
|
|||||||
|
import time
|
||||||
|
from random import randint
|
||||||
|
|
||||||
|
from bonobo.core.graphs import Graph
|
||||||
|
from bonobo.core.strategies.executor import ThreadPoolExecutorStrategy
|
||||||
|
from bonobo.ext.console import ConsoleOutputPlugin
|
||||||
|
|
||||||
|
|
||||||
|
def extract():
|
||||||
|
yield {'topic': 'foo'}
|
||||||
|
yield {'topic': 'bar'}
|
||||||
|
yield {'topic': 'baz'}
|
||||||
|
|
||||||
|
|
||||||
|
def transform(row):
|
||||||
|
wait = randint(0, 1)
|
||||||
|
time.sleep(wait)
|
||||||
|
return {
|
||||||
|
'topic': row['topic'].title(),
|
||||||
|
'wait': wait,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def load(s):
|
||||||
|
print(s)
|
||||||
|
|
||||||
|
|
||||||
|
Strategy = ThreadPoolExecutorStrategy
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
etl = Graph()
|
||||||
|
etl.add_chain(extract, transform, load)
|
||||||
|
|
||||||
|
s = Strategy()
|
||||||
|
s.execute(etl, plugins=[ConsoleOutputPlugin()])
|
||||||
|
|||||||
Reference in New Issue
Block a user