from collections import namedtuple import bonobo from bonobo.config import use_raw_input from bonobo.execution.contexts import GraphExecutionContext from bonobo.util.bags import BagType Extracted = namedtuple("Extracted", ["id", "name", "value"]) ExtractedBT = BagType("ExtractedBT", ["id", "name", "value"]) def extract_nt(): yield Extracted(id=1, name="Guido", value=".py") yield Extracted(id=2, name="Larry", value=".pl") yield Extracted(id=3, name="Dennis", value=".c") yield Extracted(id=4, name="Yukihiro", value=".rb") def extract_bt(): yield ExtractedBT(id=1, name="Guido", value=".py") yield ExtractedBT(id=2, name="Larry", value=".pl") yield ExtractedBT(id=3, name="Dennis", value=".c") yield ExtractedBT(id=4, name="Yukihiro", value=".rb") def transform_using_args(id, name, value): yield Extracted(id=id * 2, name=name, value=name.lower() + value) @use_raw_input def transform_nt(row): yield row._replace(name=row.name.upper()) def StoreInList(buffer: list): def store_in_list(*args, buffer=buffer): buffer.append(args) return store_in_list def test_execution(): graph = bonobo.Graph() result_args = [] result_nt = [] result_bt = [] graph.add_chain(extract_nt, transform_using_args, StoreInList(result_args)) graph.add_chain(transform_nt, StoreInList(result_nt), _input=extract_nt) graph.add_chain(extract_bt, transform_using_args, StoreInList(result_bt)) with GraphExecutionContext(graph) as context: context.run_until_complete() assert result_args == [ (2, "Guido", "guido.py"), (4, "Larry", "larry.pl"), (6, "Dennis", "dennis.c"), (8, "Yukihiro", "yukihiro.rb"), ] assert result_nt == [(1, "GUIDO", ".py"), (2, "LARRY", ".pl"), (3, "DENNIS", ".c"), (4, "YUKIHIRO", ".rb")] assert result_bt == [ (2, "Guido", "guido.py"), (4, "Larry", "larry.pl"), (6, "Dennis", "dennis.c"), (8, "Yukihiro", "yukihiro.rb"), ]