diff --git a/.gitignore b/.gitignore index d48b40b..f16c58c 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ *,cover *.egg *.egg-info/ +*.iml *.log *.manifest *.mo @@ -20,25 +21,17 @@ .installed.cfg .ipynb_checkpoints .python-version -.tox/ -.webassets-cache /.idea /.release -/bonobo.iml /bonobo/examples/work_in_progress/ /bonobo/ext/jupyter/js/node_modules/ /build/ /coverage.xml -/develop-eggs/ /dist/ /docs/_build/ -/downloads/ /eggs/ /examples/private -/htmlcov/ /sdist/ /tags -celerybeat-schedule -parts/ pip-delete-this-directory.txt pip-log.txt diff --git a/bonobo/logging.py b/bonobo/logging.py index 4626243..17bdeb7 100644 --- a/bonobo/logging.py +++ b/bonobo/logging.py @@ -70,9 +70,9 @@ def set_level(level): def get_logger(name='bonobo'): return logging.getLogger(name) + # Compatibility with python logging getLogger = get_logger - # Setup formating and level. setup(level=settings.LOGGING_LEVEL) diff --git a/bonobo/nodes/io/base.py b/bonobo/nodes/io/base.py new file mode 100644 index 0000000..d9b3212 --- /dev/null +++ b/bonobo/nodes/io/base.py @@ -0,0 +1,82 @@ +from bonobo import settings +from bonobo.config import Configurable, ContextProcessor, Option, Service +from bonobo.structs.bags import Bag + + +class IOFormatEnabled(Configurable): + ioformat = Option(default=settings.IOFORMAT.get) + + def get_input(self, *args, **kwargs): + if self.ioformat == settings.IOFORMAT_ARG0: + if len(args) != 1 or len(kwargs): + raise ValueError( + 'Wrong input formating: IOFORMAT=ARG0 implies one arg and no kwargs, got args={!r} and kwargs={!r}.'. + format(args, kwargs) + ) + return args[0] + + if self.ioformat == settings.IOFORMAT_KWARGS: + if len(args) or not len(kwargs): + raise ValueError( + 'Wrong input formating: IOFORMAT=KWARGS ioformat implies no arg, got args={!r} and kwargs={!r}.'. + format(args, kwargs) + ) + return kwargs + + raise NotImplementedError('Unsupported format.') + + def get_output(self, row): + if self.ioformat == settings.IOFORMAT_ARG0: + return row + + if self.ioformat == settings.IOFORMAT_KWARGS: + return Bag(**row) + + raise NotImplementedError('Unsupported format.') + + +class FileHandler(Configurable): + """Abstract component factory for file-related components. + + Args: + path (str): which path to use within the provided filesystem. + eol (str): which character to use to separate lines. + mode (str): which mode to use when opening the file. + fs (str): service name to use for filesystem. + """ + + path = Option(str, required=True, positional=True) # type: str + eol = Option(str, default='\n') # type: str + mode = Option(str) # type: str + encoding = Option(str, default='utf-8') # type: str + fs = Service('fs') # type: str + + @ContextProcessor + def file(self, context, fs): + with self.open(fs) as file: + yield file + + def open(self, fs): + return fs.open(self.path, self.mode, encoding=self.encoding) + + +class Reader: + """Abstract component factory for readers. + """ + + def __call__(self, *args, **kwargs): + yield from self.read(*args, **kwargs) + + def read(self, *args, **kwargs): + raise NotImplementedError('Abstract.') + + +class Writer: + """Abstract component factory for writers. + """ + + def __call__(self, *args, **kwargs): + return self.write(*args, **kwargs) + + def write(self, *args, **kwargs): + raise NotImplementedError('Abstract.') diff --git a/bonobo/nodes/io/csv.py b/bonobo/nodes/io/csv.py index bf3872d..ae68bd0 100644 --- a/bonobo/nodes/io/csv.py +++ b/bonobo/nodes/io/csv.py @@ -3,7 +3,8 @@ import csv from bonobo.config import Option from bonobo.config.processors import ContextProcessor from bonobo.constants import NOT_MODIFIED -from bonobo.nodes.io.file import FileHandler, FileReader, FileWriter +from bonobo.nodes.io.file import FileReader, FileWriter +from bonobo.nodes.io.base import FileHandler, IOFormatEnabled from bonobo.util.objects import ValueHolder @@ -28,7 +29,7 @@ class CsvHandler(FileHandler): headers = Option(tuple) -class CsvReader(CsvHandler, FileReader): +class CsvReader(IOFormatEnabled, FileReader, CsvHandler): """ Reads a CSV and yield the values as dicts. @@ -59,12 +60,12 @@ class CsvReader(CsvHandler, FileReader): for row in reader: if len(row) != field_count: - raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count,)) + raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count, )) yield self.get_output(dict(zip(_headers, row))) -class CsvWriter(CsvHandler, FileWriter): +class CsvWriter(IOFormatEnabled, FileWriter, CsvHandler): @ContextProcessor def writer(self, context, fs, file, lineno): writer = csv.writer(file, delimiter=self.delimiter, quotechar=self.quotechar, lineterminator=self.eol) diff --git a/bonobo/nodes/io/file.py b/bonobo/nodes/io/file.py index 53ba138..e49d6de 100644 --- a/bonobo/nodes/io/file.py +++ b/bonobo/nodes/io/file.py @@ -1,81 +1,11 @@ -from bonobo import settings -from bonobo.config import Option, Service -from bonobo.config.configurables import Configurable +from bonobo.config import Option from bonobo.config.processors import ContextProcessor from bonobo.constants import NOT_MODIFIED -from bonobo.structs.bags import Bag +from bonobo.nodes.io.base import FileHandler, Reader, Writer from bonobo.util.objects import ValueHolder -class FileHandler(Configurable): - """Abstract component factory for file-related components. - - Args: - path (str): which path to use within the provided filesystem. - eol (str): which character to use to separate lines. - mode (str): which mode to use when opening the file. - fs (str): service name to use for filesystem. - """ - - path = Option(str, required=True, positional=True) # type: str - eol = Option(str, default='\n') # type: str - mode = Option(str) # type: str - encoding = Option(str, default='utf-8') # type: str - fs = Service('fs') # type: str - ioformat = Option(default=settings.IOFORMAT.get) - - @ContextProcessor - def file(self, context, fs): - with self.open(fs) as file: - yield file - - def open(self, fs): - return fs.open(self.path, self.mode, encoding=self.encoding) - - def get_input(self, *args, **kwargs): - if self.ioformat == settings.IOFORMAT_ARG0: - assert len(args) == 1 and not len(kwargs), 'ARG0 format implies one arg and no kwargs.' - return args[0] - - if self.ioformat == settings.IOFORMAT_KWARGS: - assert len(args) == 0 and len(kwargs), 'KWARGS format implies no arg.' - return kwargs - - raise NotImplementedError('Unsupported format.') - - def get_output(self, row): - if self.ioformat == settings.IOFORMAT_ARG0: - return row - - if self.ioformat == settings.IOFORMAT_KWARGS: - return Bag(**row) - - raise NotImplementedError('Unsupported format.') - - -class Reader(FileHandler): - """Abstract component factory for readers. - """ - - def __call__(self, *args, **kwargs): - yield from self.read(*args, **kwargs) - - def read(self, *args, **kwargs): - raise NotImplementedError('Abstract.') - - -class Writer(FileHandler): - """Abstract component factory for writers. - """ - - def __call__(self, *args, **kwargs): - return self.write(*args) - - def write(self, *args, **kwargs): - raise NotImplementedError('Abstract.') - - -class FileReader(Reader): +class FileReader(Reader, FileHandler): """Component factory for file-like readers. On its own, it can be used to read a file and yield one row per line, trimming the "eol" character at the end if @@ -93,7 +23,7 @@ class FileReader(Reader): yield line.rstrip(self.eol) -class FileWriter(Writer): +class FileWriter(Writer, FileHandler): """Component factory for file or file-like writers. On its own, it can be used to write in a file one line per row that comes into this component. Extending it is @@ -107,11 +37,11 @@ class FileWriter(Writer): lineno = ValueHolder(0) yield lineno - def write(self, fs, file, lineno, row): + def write(self, fs, file, lineno, line): """ Write a row on the next line of opened file in context. """ - self._write_line(file, (self.eol if lineno.value else '') + row) + self._write_line(file, (self.eol if lineno.value else '') + line) lineno += 1 return NOT_MODIFIED diff --git a/bonobo/nodes/io/json.py b/bonobo/nodes/io/json.py index f355c02..c6d9bf5 100644 --- a/bonobo/nodes/io/json.py +++ b/bonobo/nodes/io/json.py @@ -1,15 +1,17 @@ import json from bonobo.config.processors import ContextProcessor -from bonobo.nodes.io.file import FileWriter, FileReader +from bonobo.constants import NOT_MODIFIED +from bonobo.nodes.io.base import FileHandler, IOFormatEnabled +from bonobo.nodes.io.file import FileReader, FileWriter -class JsonHandler(): +class JsonHandler(FileHandler): eol = ',\n' prefix, suffix = '[', ']' -class JsonReader(JsonHandler, FileReader): +class JsonReader(IOFormatEnabled, FileReader, JsonHandler): loader = staticmethod(json.load) def read(self, fs, file): @@ -17,18 +19,21 @@ class JsonReader(JsonHandler, FileReader): yield self.get_output(line) -class JsonWriter(JsonHandler, FileWriter): +class JsonWriter(IOFormatEnabled, FileWriter, JsonHandler): @ContextProcessor def envelope(self, context, fs, file, lineno): file.write(self.prefix) yield file.write(self.suffix) - def write(self, fs, file, lineno, row): + def write(self, fs, file, lineno, *args, **kwargs): """ Write a json row on the next line of file pointed by ctx.file. :param ctx: :param row: """ - return super().write(fs, file, lineno, json.dumps(row)) + row = self.get_input(*args, **kwargs) + self._write_line(file, (self.eol if lineno.value else '') + json.dumps(row)) + lineno += 1 + return NOT_MODIFIED diff --git a/bonobo/nodes/io/pickle.py b/bonobo/nodes/io/pickle.py index c603e91..e94f94a 100644 --- a/bonobo/nodes/io/pickle.py +++ b/bonobo/nodes/io/pickle.py @@ -1,10 +1,11 @@ import pickle -from bonobo.config.processors import ContextProcessor from bonobo.config import Option +from bonobo.config.processors import ContextProcessor from bonobo.constants import NOT_MODIFIED +from bonobo.nodes.io.base import FileHandler, IOFormatEnabled +from bonobo.nodes.io.file import FileReader, FileWriter from bonobo.util.objects import ValueHolder -from .file import FileReader, FileWriter, FileHandler class PickleHandler(FileHandler): @@ -19,7 +20,7 @@ class PickleHandler(FileHandler): item_names = Option(tuple) -class PickleReader(PickleHandler, FileReader): +class PickleReader(IOFormatEnabled, FileReader, PickleHandler): """ Reads a Python pickle object and yields the items in dicts. """ @@ -56,8 +57,7 @@ class PickleReader(PickleHandler, FileReader): yield self.get_output(dict(zip(i)) if is_dict else dict(zip(pickle_headers.value, i))) -class PickleWriter(PickleHandler, FileWriter): - +class PickleWriter(IOFormatEnabled, FileWriter, PickleHandler): mode = Option(str, default='wb') def write(self, fs, file, lineno, item): diff --git a/bonobo/util/testing.py b/bonobo/util/testing.py index d5b6cc8..7c07256 100644 --- a/bonobo/util/testing.py +++ b/bonobo/util/testing.py @@ -1,6 +1,7 @@ from contextlib import contextmanager from unittest.mock import MagicMock +from bonobo import open_fs from bonobo.execution.node import NodeExecutionContext @@ -17,3 +18,20 @@ def optional_contextmanager(cm, *, ignore=False): else: with cm: yield + + +class FilesystemTester: + def __init__(self, extension='txt', mode='w'): + self.extension = extension + self.input_data = '' + self.mode = mode + + def get_services_for_reader(self, tmpdir): + fs, filename = open_fs(tmpdir), 'input.' + self.extension + with fs.open(filename, self.mode) as fp: + fp.write(self.input_data) + return fs, filename, {'fs': fs} + + def get_services_for_writer(self, tmpdir): + fs, filename = open_fs(tmpdir), 'output.' + self.extension + return fs, filename, {'fs': fs} diff --git a/config/__init__.py b/config/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/io/test_csv.py b/tests/io/test_csv.py index 47e641b..dc6f71c 100644 --- a/tests/io/test_csv.py +++ b/tests/io/test_csv.py @@ -1,23 +1,21 @@ import pytest -from bonobo import Bag, CsvReader, CsvWriter, open_fs, settings +from bonobo import Bag, CsvReader, CsvWriter, settings from bonobo.constants import BEGIN, END from bonobo.execution.node import NodeExecutionContext -from bonobo.util.testing import CapturingNodeExecutionContext +from bonobo.util.testing import CapturingNodeExecutionContext, FilesystemTester + +csv_tester = FilesystemTester('csv') +csv_tester.input_data = 'a,b,c\na foo,b foo,c foo\na bar,b bar,c bar' -def test_write_csv_to_file(tmpdir): - fs, filename = open_fs(tmpdir), 'output.csv' +def test_write_csv_to_file_arg0(tmpdir): + fs, filename, services = csv_tester.get_services_for_writer(tmpdir) - writer = CsvWriter(path=filename, ioformat=settings.IOFORMAT_ARG0) - context = NodeExecutionContext(writer, services={'fs': fs}) - - context.write(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END) - - context.start() - context.step() - context.step() - context.stop() + with NodeExecutionContext(CsvWriter(path=filename, ioformat=settings.IOFORMAT_ARG0), services=services) as context: + context.write(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END) + context.step() + context.step() with fs.open(filename) as fp: assert fp.read() == 'foo\nbar\nbaz\n' @@ -26,19 +24,33 @@ def test_write_csv_to_file(tmpdir): getattr(context, 'file') -def test_read_csv_from_file(tmpdir): - fs, filename = open_fs(tmpdir), 'input.csv' - with fs.open(filename, 'w') as fp: - fp.write('a,b,c\na foo,b foo,c foo\na bar,b bar,c bar') +@pytest.mark.parametrize('add_kwargs', ({}, { + 'ioformat': settings.IOFORMAT_KWARGS, +},)) +def test_write_csv_to_file_kwargs(tmpdir, add_kwargs): + fs, filename, services = csv_tester.get_services_for_writer(tmpdir) - reader = CsvReader(path=filename, delimiter=',', ioformat=settings.IOFORMAT_ARG0) + with NodeExecutionContext(CsvWriter(path=filename, **add_kwargs), services=services) as context: + context.write(BEGIN, Bag(**{'foo': 'bar'}), Bag(**{'foo': 'baz', 'ignore': 'this'}), END) + context.step() + context.step() - context = CapturingNodeExecutionContext(reader, services={'fs': fs}) + with fs.open(filename) as fp: + assert fp.read() == 'foo\nbar\nbaz\n' - context.start() - context.write(BEGIN, Bag(), END) - context.step() - context.stop() + with pytest.raises(AttributeError): + getattr(context, 'file') + + +def test_read_csv_from_file_arg0(tmpdir): + fs, filename, services = csv_tester.get_services_for_reader(tmpdir) + + with CapturingNodeExecutionContext( + CsvReader(path=filename, delimiter=',', ioformat=settings.IOFORMAT_ARG0), + services=services, + ) as context: + context.write(BEGIN, Bag(), END) + context.step() assert len(context.send.mock_calls) == 2 @@ -59,19 +71,15 @@ def test_read_csv_from_file(tmpdir): } -def test_read_csv_kwargs_output_formater(tmpdir): - fs, filename = open_fs(tmpdir), 'input.csv' - with fs.open(filename, 'w') as fp: - fp.write('a,b,c\na foo,b foo,c foo\na bar,b bar,c bar') +def test_read_csv_from_file_kwargs(tmpdir): + fs, filename, services = csv_tester.get_services_for_reader(tmpdir) - reader = CsvReader(path=filename, delimiter=',') - - context = CapturingNodeExecutionContext(reader, services={'fs': fs}) - - context.start() - context.write(BEGIN, Bag(), END) - context.step() - context.stop() + with CapturingNodeExecutionContext( + CsvReader(path=filename, delimiter=','), + services=services, + ) as context: + context.write(BEGIN, Bag(), END) + context.step() assert len(context.send.mock_calls) == 2 diff --git a/tests/io/test_file.py b/tests/io/test_file.py index 1566b39..07a15eb 100644 --- a/tests/io/test_file.py +++ b/tests/io/test_file.py @@ -1,9 +1,22 @@ import pytest -from bonobo import Bag, FileReader, FileWriter, open_fs +from bonobo import Bag, FileReader, FileWriter from bonobo.constants import BEGIN, END from bonobo.execution.node import NodeExecutionContext -from bonobo.util.testing import CapturingNodeExecutionContext +from bonobo.util.testing import CapturingNodeExecutionContext, FilesystemTester + +txt_tester = FilesystemTester('txt') +txt_tester.input_data = 'Hello\nWorld\n' + + +def test_file_writer_contextless(tmpdir): + fs, filename, services = txt_tester.get_services_for_writer(tmpdir) + + with FileWriter(path=filename).open(fs) as fp: + fp.write('Yosh!') + + with fs.open(filename) as fp: + assert fp.read() == 'Yosh!' @pytest.mark.parametrize( @@ -14,46 +27,23 @@ from bonobo.util.testing import CapturingNodeExecutionContext ] ) def test_file_writer_in_context(tmpdir, lines, output): - fs, filename = open_fs(tmpdir), 'output.txt' + fs, filename, services = txt_tester.get_services_for_writer(tmpdir) - writer = FileWriter(path=filename) - context = NodeExecutionContext(writer, services={'fs': fs}) - - context.start() - context.write(BEGIN, *map(Bag, lines), END) - for _ in range(len(lines)): - context.step() - context.stop() + with NodeExecutionContext(FileWriter(path=filename), services=services) as context: + context.write(BEGIN, *map(Bag, lines), END) + for _ in range(len(lines)): + context.step() with fs.open(filename) as fp: assert fp.read() == output -def test_file_writer_out_of_context(tmpdir): - fs, filename = open_fs(tmpdir), 'output.txt' +def test_file_reader(tmpdir): + fs, filename, services = txt_tester.get_services_for_reader(tmpdir) - writer = FileWriter(path=filename) - - with writer.open(fs) as fp: - fp.write('Yosh!') - - with fs.open(filename) as fp: - assert fp.read() == 'Yosh!' - - -def test_file_reader_in_context(tmpdir): - fs, filename = open_fs(tmpdir), 'input.txt' - - with fs.open(filename, 'w') as fp: - fp.write('Hello\nWorld\n') - - reader = FileReader(path=filename) - context = CapturingNodeExecutionContext(reader, services={'fs': fs}) - - context.start() - context.write(BEGIN, Bag(), END) - context.step() - context.stop() + with CapturingNodeExecutionContext(FileReader(path=filename), services=services) as context: + context.write(BEGIN, Bag(), END) + context.step() assert len(context.send.mock_calls) == 2 diff --git a/tests/io/test_json.py b/tests/io/test_json.py index 15d9e7e..75350ce 100644 --- a/tests/io/test_json.py +++ b/tests/io/test_json.py @@ -1,44 +1,48 @@ import pytest -from bonobo import Bag, JsonReader, JsonWriter, open_fs, settings +from bonobo import Bag, JsonReader, JsonWriter, settings from bonobo.constants import BEGIN, END from bonobo.execution.node import NodeExecutionContext -from bonobo.util.testing import CapturingNodeExecutionContext +from bonobo.util.testing import CapturingNodeExecutionContext, FilesystemTester + +json_tester = FilesystemTester('json') +json_tester.input_data = '''[{"x": "foo"},{"x": "bar"}]''' -def test_write_json_to_file(tmpdir): - fs, filename = open_fs(tmpdir), 'output.json' +def test_write_json_arg0(tmpdir): + fs, filename, services = json_tester.get_services_for_writer(tmpdir) - writer = JsonWriter(filename, ioformat=settings.IOFORMAT_ARG0) - context = NodeExecutionContext(writer, services={'fs': fs}) - - context.start() - context.write(BEGIN, Bag({'foo': 'bar'}), END) - context.step() - context.stop() + with NodeExecutionContext(JsonWriter(filename, ioformat=settings.IOFORMAT_ARG0), services=services) as context: + context.write(BEGIN, Bag({'foo': 'bar'}), END) + context.step() with fs.open(filename) as fp: assert fp.read() == '[{"foo": "bar"}]' - with pytest.raises(AttributeError): - getattr(context, 'file') - with pytest.raises(AttributeError): - getattr(context, 'first') +@pytest.mark.parametrize('add_kwargs', ({}, { + 'ioformat': settings.IOFORMAT_KWARGS, +}, )) +def test_write_json_kwargs(tmpdir, add_kwargs): + fs, filename, services = json_tester.get_services_for_writer(tmpdir) + + with NodeExecutionContext(JsonWriter(filename, **add_kwargs), services=services) as context: + context.write(BEGIN, Bag(**{'foo': 'bar'}), END) + context.step() + + with fs.open(filename) as fp: + assert fp.read() == '[{"foo": "bar"}]' -def test_read_json_from_file(tmpdir): - fs, filename = open_fs(tmpdir), 'input.json' - with fs.open(filename, 'w') as fp: - fp.write('[{"x": "foo"},{"x": "bar"}]') - reader = JsonReader(filename, ioformat=settings.IOFORMAT_ARG0) +def test_read_json_arg0(tmpdir): + fs, filename, services = json_tester.get_services_for_reader(tmpdir) - context = CapturingNodeExecutionContext(reader, services={'fs': fs}) - - context.start() - context.write(BEGIN, Bag(), END) - context.step() - context.stop() + with CapturingNodeExecutionContext( + JsonReader(filename, ioformat=settings.IOFORMAT_ARG0), + services=services, + ) as context: + context.write(BEGIN, Bag(), END) + context.step() assert len(context.send.mock_calls) == 2 diff --git a/tests/io/test_pickle.py b/tests/io/test_pickle.py index 1709b16..aff7796 100644 --- a/tests/io/test_pickle.py +++ b/tests/io/test_pickle.py @@ -2,24 +2,22 @@ import pickle import pytest -from bonobo import Bag, PickleReader, PickleWriter, open_fs, settings +from bonobo import Bag, PickleReader, PickleWriter, settings from bonobo.constants import BEGIN, END from bonobo.execution.node import NodeExecutionContext -from bonobo.util.testing import CapturingNodeExecutionContext +from bonobo.util.testing import CapturingNodeExecutionContext, FilesystemTester + +pickle_tester = FilesystemTester('pkl', mode='wb') +pickle_tester.input_data = pickle.dumps([['a', 'b', 'c'], ['a foo', 'b foo', 'c foo'], ['a bar', 'b bar', 'c bar']]) def test_write_pickled_dict_to_file(tmpdir): - fs, filename = open_fs(tmpdir), 'output.pkl' + fs, filename, services = pickle_tester.get_services_for_writer(tmpdir) - writer = PickleWriter(filename, ioformat=settings.IOFORMAT_ARG0) - context = NodeExecutionContext(writer, services={'fs': fs}) - - context.write(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END) - - context.start() - context.step() - context.step() - context.stop() + with NodeExecutionContext(PickleWriter(filename, ioformat=settings.IOFORMAT_ARG0), services=services) as context: + context.write(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END) + context.step() + context.step() with fs.open(filename, 'rb') as fp: assert pickle.loads(fp.read()) == {'foo': 'bar'} @@ -29,18 +27,13 @@ def test_write_pickled_dict_to_file(tmpdir): def test_read_pickled_list_from_file(tmpdir): - fs, filename = open_fs(tmpdir), 'input.pkl' - with fs.open(filename, 'wb') as fp: - fp.write(pickle.dumps([['a', 'b', 'c'], ['a foo', 'b foo', 'c foo'], ['a bar', 'b bar', 'c bar']])) + fs, filename, services = pickle_tester.get_services_for_reader(tmpdir) - reader = PickleReader(filename, ioformat=settings.IOFORMAT_ARG0) - - context = CapturingNodeExecutionContext(reader, services={'fs': fs}) - - context.start() - context.write(BEGIN, Bag(), END) - context.step() - context.stop() + with CapturingNodeExecutionContext( + PickleReader(filename, ioformat=settings.IOFORMAT_ARG0), services=services + ) as context: + context.write(BEGIN, Bag(), END) + context.step() assert len(context.send.mock_calls) == 2