Merge pull request #115 from hartym/master

Master
This commit is contained in:
Romain Dorgueil
2017-06-17 11:49:26 +02:00
committed by GitHub
18 changed files with 285 additions and 230 deletions

9
.gitignore vendored
View File

@ -2,6 +2,7 @@
*,cover
*.egg
*.egg-info/
*.iml
*.log
*.manifest
*.mo
@ -20,25 +21,17 @@
.installed.cfg
.ipynb_checkpoints
.python-version
.tox/
.webassets-cache
/.idea
/.release
/bonobo.iml
/bonobo/examples/work_in_progress/
/bonobo/ext/jupyter/js/node_modules/
/build/
/coverage.xml
/develop-eggs/
/dist/
/docs/_build/
/downloads/
/eggs/
/examples/private
/htmlcov/
/sdist/
/tags
celerybeat-schedule
parts/
pip-delete-this-directory.txt
pip-log.txt

View File

@ -68,7 +68,7 @@ register_api(create_strategy)
# Shortcut to filesystem2's open_fs, that we make available there for convenience.
@register_api
def open_fs(fs_url, *args, **kwargs):
def open_fs(fs_url=None, *args, **kwargs):
"""
Wraps :func:`fs.open_fs` function with a few candies.
@ -83,6 +83,10 @@ def open_fs(fs_url, *args, **kwargs):
"""
from fs import open_fs as _open_fs
from os.path import expanduser
from os import getcwd
if fs_url is None:
fs_url = getcwd()
return _open_fs(expanduser(str(fs_url)), *args, **kwargs)

View File

@ -1,7 +1,7 @@
from bonobo.config.configurables import Configurable
from bonobo.config.options import Option, Method
from bonobo.config.options import Method, Option
from bonobo.config.processors import ContextProcessor
from bonobo.config.services import Container, Service, Exclusive
from bonobo.config.services import Container, Exclusive, Service, requires
# bonobo.config public programming interface
__all__ = [
@ -12,4 +12,5 @@ __all__ = [
'Method',
'Option',
'Service',
'requires',
]

View File

@ -56,7 +56,11 @@ class Service(Option):
inst.__options_values__[self.name] = validate_service_name(value)
def resolve(self, inst, services):
return services.get(getattr(inst, self.name))
try:
name = getattr(inst, self.name)
except AttributeError:
name = self.name
return services.get(name)
class Container(dict):
@ -126,3 +130,19 @@ class Exclusive(ContextDecorator):
def __exit__(self, *exc):
self.get_lock().release()
def requires(*service_names):
def decorate(mixed):
try:
options = mixed.__options__
except AttributeError:
mixed.__options__ = options = {}
for service_name in service_names:
service = Service(service_name)
service.name = service_name
options[service_name] = service
return mixed
return decorate

View File

@ -58,6 +58,13 @@ class LoopingExecutionContext(Wrapper):
# XXX enhancers
self._enhancers = get_enhancers(self.wrapped)
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type=None, exc_val=None, exc_tb=None):
self.stop()
def start(self):
if self.started:
raise RuntimeError('Cannot start a node twice ({}).'.format(get_name(self)))

View File

@ -71,5 +71,8 @@ def get_logger(name='bonobo'):
return logging.getLogger(name)
# Compatibility with python logging
getLogger = get_logger
# Setup formating and level.
setup(level=settings.LOGGING_LEVEL)

82
bonobo/nodes/io/base.py Normal file
View File

@ -0,0 +1,82 @@
from bonobo import settings
from bonobo.config import Configurable, ContextProcessor, Option, Service
from bonobo.structs.bags import Bag
class IOFormatEnabled(Configurable):
ioformat = Option(default=settings.IOFORMAT.get)
def get_input(self, *args, **kwargs):
if self.ioformat == settings.IOFORMAT_ARG0:
if len(args) != 1 or len(kwargs):
raise ValueError(
'Wrong input formating: IOFORMAT=ARG0 implies one arg and no kwargs, got args={!r} and kwargs={!r}.'.
format(args, kwargs)
)
return args[0]
if self.ioformat == settings.IOFORMAT_KWARGS:
if len(args) or not len(kwargs):
raise ValueError(
'Wrong input formating: IOFORMAT=KWARGS ioformat implies no arg, got args={!r} and kwargs={!r}.'.
format(args, kwargs)
)
return kwargs
raise NotImplementedError('Unsupported format.')
def get_output(self, row):
if self.ioformat == settings.IOFORMAT_ARG0:
return row
if self.ioformat == settings.IOFORMAT_KWARGS:
return Bag(**row)
raise NotImplementedError('Unsupported format.')
class FileHandler(Configurable):
"""Abstract component factory for file-related components.
Args:
path (str): which path to use within the provided filesystem.
eol (str): which character to use to separate lines.
mode (str): which mode to use when opening the file.
fs (str): service name to use for filesystem.
"""
path = Option(str, required=True, positional=True) # type: str
eol = Option(str, default='\n') # type: str
mode = Option(str) # type: str
encoding = Option(str, default='utf-8') # type: str
fs = Service('fs') # type: str
@ContextProcessor
def file(self, context, fs):
with self.open(fs) as file:
yield file
def open(self, fs):
return fs.open(self.path, self.mode, encoding=self.encoding)
class Reader:
"""Abstract component factory for readers.
"""
def __call__(self, *args, **kwargs):
yield from self.read(*args, **kwargs)
def read(self, *args, **kwargs):
raise NotImplementedError('Abstract.')
class Writer:
"""Abstract component factory for writers.
"""
def __call__(self, *args, **kwargs):
return self.write(*args, **kwargs)
def write(self, *args, **kwargs):
raise NotImplementedError('Abstract.')

View File

@ -3,7 +3,8 @@ import csv
from bonobo.config import Option
from bonobo.config.processors import ContextProcessor
from bonobo.constants import NOT_MODIFIED
from bonobo.nodes.io.file import FileHandler, FileReader, FileWriter
from bonobo.nodes.io.file import FileReader, FileWriter
from bonobo.nodes.io.base import FileHandler, IOFormatEnabled
from bonobo.util.objects import ValueHolder
@ -28,7 +29,7 @@ class CsvHandler(FileHandler):
headers = Option(tuple)
class CsvReader(CsvHandler, FileReader):
class CsvReader(IOFormatEnabled, FileReader, CsvHandler):
"""
Reads a CSV and yield the values as dicts.
@ -64,7 +65,7 @@ class CsvReader(CsvHandler, FileReader):
yield self.get_output(dict(zip(_headers, row)))
class CsvWriter(CsvHandler, FileWriter):
class CsvWriter(IOFormatEnabled, FileWriter, CsvHandler):
@ContextProcessor
def writer(self, context, fs, file, lineno):
writer = csv.writer(file, delimiter=self.delimiter, quotechar=self.quotechar, lineterminator=self.eol)

View File

@ -1,81 +1,11 @@
from bonobo import settings
from bonobo.config import Option, Service
from bonobo.config.configurables import Configurable
from bonobo.config import Option
from bonobo.config.processors import ContextProcessor
from bonobo.constants import NOT_MODIFIED
from bonobo.structs.bags import Bag
from bonobo.nodes.io.base import FileHandler, Reader, Writer
from bonobo.util.objects import ValueHolder
class FileHandler(Configurable):
"""Abstract component factory for file-related components.
Args:
path (str): which path to use within the provided filesystem.
eol (str): which character to use to separate lines.
mode (str): which mode to use when opening the file.
fs (str): service name to use for filesystem.
"""
path = Option(str, required=True, positional=True) # type: str
eol = Option(str, default='\n') # type: str
mode = Option(str) # type: str
encoding = Option(str, default='utf-8') # type: str
fs = Service('fs') # type: str
ioformat = Option(default=settings.IOFORMAT.get)
@ContextProcessor
def file(self, context, fs):
with self.open(fs) as file:
yield file
def open(self, fs):
return fs.open(self.path, self.mode, encoding=self.encoding)
def get_input(self, *args, **kwargs):
if self.ioformat == settings.IOFORMAT_ARG0:
assert len(args) == 1 and not len(kwargs), 'ARG0 format implies one arg and no kwargs.'
return args[0]
if self.ioformat == settings.IOFORMAT_KWARGS:
assert len(args) == 0 and len(kwargs), 'KWARGS format implies no arg.'
return kwargs
raise NotImplementedError('Unsupported format.')
def get_output(self, row):
if self.ioformat == settings.IOFORMAT_ARG0:
return row
if self.ioformat == settings.IOFORMAT_KWARGS:
return Bag(**row)
raise NotImplementedError('Unsupported format.')
class Reader(FileHandler):
"""Abstract component factory for readers.
"""
def __call__(self, *args, **kwargs):
yield from self.read(*args, **kwargs)
def read(self, *args, **kwargs):
raise NotImplementedError('Abstract.')
class Writer(FileHandler):
"""Abstract component factory for writers.
"""
def __call__(self, *args, **kwargs):
return self.write(*args)
def write(self, *args, **kwargs):
raise NotImplementedError('Abstract.')
class FileReader(Reader):
class FileReader(Reader, FileHandler):
"""Component factory for file-like readers.
On its own, it can be used to read a file and yield one row per line, trimming the "eol" character at the end if
@ -93,7 +23,7 @@ class FileReader(Reader):
yield line.rstrip(self.eol)
class FileWriter(Writer):
class FileWriter(Writer, FileHandler):
"""Component factory for file or file-like writers.
On its own, it can be used to write in a file one line per row that comes into this component. Extending it is
@ -107,11 +37,11 @@ class FileWriter(Writer):
lineno = ValueHolder(0)
yield lineno
def write(self, fs, file, lineno, row):
def write(self, fs, file, lineno, line):
"""
Write a row on the next line of opened file in context.
"""
self._write_line(file, (self.eol if lineno.value else '') + row)
self._write_line(file, (self.eol if lineno.value else '') + line)
lineno += 1
return NOT_MODIFIED

View File

@ -1,15 +1,17 @@
import json
from bonobo.config.processors import ContextProcessor
from bonobo.nodes.io.file import FileWriter, FileReader
from bonobo.constants import NOT_MODIFIED
from bonobo.nodes.io.base import FileHandler, IOFormatEnabled
from bonobo.nodes.io.file import FileReader, FileWriter
class JsonHandler():
class JsonHandler(FileHandler):
eol = ',\n'
prefix, suffix = '[', ']'
class JsonReader(JsonHandler, FileReader):
class JsonReader(IOFormatEnabled, FileReader, JsonHandler):
loader = staticmethod(json.load)
def read(self, fs, file):
@ -17,18 +19,21 @@ class JsonReader(JsonHandler, FileReader):
yield self.get_output(line)
class JsonWriter(JsonHandler, FileWriter):
class JsonWriter(IOFormatEnabled, FileWriter, JsonHandler):
@ContextProcessor
def envelope(self, context, fs, file, lineno):
file.write(self.prefix)
yield
file.write(self.suffix)
def write(self, fs, file, lineno, row):
def write(self, fs, file, lineno, *args, **kwargs):
"""
Write a json row on the next line of file pointed by ctx.file.
:param ctx:
:param row:
"""
return super().write(fs, file, lineno, json.dumps(row))
row = self.get_input(*args, **kwargs)
self._write_line(file, (self.eol if lineno.value else '') + json.dumps(row))
lineno += 1
return NOT_MODIFIED

View File

@ -1,10 +1,11 @@
import pickle
from bonobo.config.processors import ContextProcessor
from bonobo.config import Option
from bonobo.config.processors import ContextProcessor
from bonobo.constants import NOT_MODIFIED
from bonobo.nodes.io.base import FileHandler, IOFormatEnabled
from bonobo.nodes.io.file import FileReader, FileWriter
from bonobo.util.objects import ValueHolder
from .file import FileReader, FileWriter, FileHandler
class PickleHandler(FileHandler):
@ -19,7 +20,7 @@ class PickleHandler(FileHandler):
item_names = Option(tuple)
class PickleReader(PickleHandler, FileReader):
class PickleReader(IOFormatEnabled, FileReader, PickleHandler):
"""
Reads a Python pickle object and yields the items in dicts.
"""
@ -56,8 +57,7 @@ class PickleReader(PickleHandler, FileReader):
yield self.get_output(dict(zip(i)) if is_dict else dict(zip(pickle_headers.value, i)))
class PickleWriter(PickleHandler, FileWriter):
class PickleWriter(IOFormatEnabled, FileWriter, PickleHandler):
mode = Option(str, default='wb')
def write(self, fs, file, lineno, item):

View File

@ -1,6 +1,5 @@
import time
import traceback
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
from bonobo.constants import BEGIN, END
@ -29,19 +28,16 @@ class ExecutorStrategy(Strategy):
futures = []
for plugin_context in context.plugins:
def _runner(plugin_context=plugin_context):
try:
plugin_context.start()
plugin_context.loop()
plugin_context.stop()
except Exception as exc:
print_error(exc, traceback.format_exc(), context=plugin_context)
with plugin_context:
try:
plugin_context.loop()
except Exception as exc:
print_error(exc, traceback.format_exc(), context=plugin_context)
futures.append(executor.submit(_runner))
for node_context in context.nodes:
def _runner(node_context=node_context):
try:
node_context.start()

View File

@ -1,6 +1,7 @@
from contextlib import contextmanager
from unittest.mock import MagicMock
from bonobo import open_fs
from bonobo.execution.node import NodeExecutionContext
@ -17,3 +18,20 @@ def optional_contextmanager(cm, *, ignore=False):
else:
with cm:
yield
class FilesystemTester:
def __init__(self, extension='txt', mode='w'):
self.extension = extension
self.input_data = ''
self.mode = mode
def get_services_for_reader(self, tmpdir):
fs, filename = open_fs(tmpdir), 'input.' + self.extension
with fs.open(filename, self.mode) as fp:
fp.write(self.input_data)
return fs, filename, {'fs': fs}
def get_services_for_writer(self, tmpdir):
fs, filename = open_fs(tmpdir), 'output.' + self.extension
return fs, filename, {'fs': fs}

View File

View File

@ -1,23 +1,21 @@
import pytest
from bonobo import Bag, CsvReader, CsvWriter, open_fs, settings
from bonobo import Bag, CsvReader, CsvWriter, settings
from bonobo.constants import BEGIN, END
from bonobo.execution.node import NodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext, FilesystemTester
csv_tester = FilesystemTester('csv')
csv_tester.input_data = 'a,b,c\na foo,b foo,c foo\na bar,b bar,c bar'
def test_write_csv_to_file(tmpdir):
fs, filename = open_fs(tmpdir), 'output.csv'
def test_write_csv_to_file_arg0(tmpdir):
fs, filename, services = csv_tester.get_services_for_writer(tmpdir)
writer = CsvWriter(path=filename, ioformat=settings.IOFORMAT_ARG0)
context = NodeExecutionContext(writer, services={'fs': fs})
context.write(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END)
context.start()
context.step()
context.step()
context.stop()
with NodeExecutionContext(CsvWriter(path=filename, ioformat=settings.IOFORMAT_ARG0), services=services) as context:
context.write(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END)
context.step()
context.step()
with fs.open(filename) as fp:
assert fp.read() == 'foo\nbar\nbaz\n'
@ -26,19 +24,33 @@ def test_write_csv_to_file(tmpdir):
getattr(context, 'file')
def test_read_csv_from_file(tmpdir):
fs, filename = open_fs(tmpdir), 'input.csv'
with fs.open(filename, 'w') as fp:
fp.write('a,b,c\na foo,b foo,c foo\na bar,b bar,c bar')
@pytest.mark.parametrize('add_kwargs', ({}, {
'ioformat': settings.IOFORMAT_KWARGS,
},))
def test_write_csv_to_file_kwargs(tmpdir, add_kwargs):
fs, filename, services = csv_tester.get_services_for_writer(tmpdir)
reader = CsvReader(path=filename, delimiter=',', ioformat=settings.IOFORMAT_ARG0)
with NodeExecutionContext(CsvWriter(path=filename, **add_kwargs), services=services) as context:
context.write(BEGIN, Bag(**{'foo': 'bar'}), Bag(**{'foo': 'baz', 'ignore': 'this'}), END)
context.step()
context.step()
context = CapturingNodeExecutionContext(reader, services={'fs': fs})
with fs.open(filename) as fp:
assert fp.read() == 'foo\nbar\nbaz\n'
context.start()
context.write(BEGIN, Bag(), END)
context.step()
context.stop()
with pytest.raises(AttributeError):
getattr(context, 'file')
def test_read_csv_from_file_arg0(tmpdir):
fs, filename, services = csv_tester.get_services_for_reader(tmpdir)
with CapturingNodeExecutionContext(
CsvReader(path=filename, delimiter=',', ioformat=settings.IOFORMAT_ARG0),
services=services,
) as context:
context.write(BEGIN, Bag(), END)
context.step()
assert len(context.send.mock_calls) == 2
@ -59,19 +71,15 @@ def test_read_csv_from_file(tmpdir):
}
def test_read_csv_kwargs_output_formater(tmpdir):
fs, filename = open_fs(tmpdir), 'input.csv'
with fs.open(filename, 'w') as fp:
fp.write('a,b,c\na foo,b foo,c foo\na bar,b bar,c bar')
def test_read_csv_from_file_kwargs(tmpdir):
fs, filename, services = csv_tester.get_services_for_reader(tmpdir)
reader = CsvReader(path=filename, delimiter=',')
context = CapturingNodeExecutionContext(reader, services={'fs': fs})
context.start()
context.write(BEGIN, Bag(), END)
context.step()
context.stop()
with CapturingNodeExecutionContext(
CsvReader(path=filename, delimiter=','),
services=services,
) as context:
context.write(BEGIN, Bag(), END)
context.step()
assert len(context.send.mock_calls) == 2

View File

@ -1,9 +1,22 @@
import pytest
from bonobo import Bag, FileReader, FileWriter, open_fs
from bonobo import Bag, FileReader, FileWriter
from bonobo.constants import BEGIN, END
from bonobo.execution.node import NodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext, FilesystemTester
txt_tester = FilesystemTester('txt')
txt_tester.input_data = 'Hello\nWorld\n'
def test_file_writer_contextless(tmpdir):
fs, filename, services = txt_tester.get_services_for_writer(tmpdir)
with FileWriter(path=filename).open(fs) as fp:
fp.write('Yosh!')
with fs.open(filename) as fp:
assert fp.read() == 'Yosh!'
@pytest.mark.parametrize(
@ -14,46 +27,23 @@ from bonobo.util.testing import CapturingNodeExecutionContext
]
)
def test_file_writer_in_context(tmpdir, lines, output):
fs, filename = open_fs(tmpdir), 'output.txt'
fs, filename, services = txt_tester.get_services_for_writer(tmpdir)
writer = FileWriter(path=filename)
context = NodeExecutionContext(writer, services={'fs': fs})
context.start()
context.write(BEGIN, *map(Bag, lines), END)
for _ in range(len(lines)):
context.step()
context.stop()
with NodeExecutionContext(FileWriter(path=filename), services=services) as context:
context.write(BEGIN, *map(Bag, lines), END)
for _ in range(len(lines)):
context.step()
with fs.open(filename) as fp:
assert fp.read() == output
def test_file_writer_out_of_context(tmpdir):
fs, filename = open_fs(tmpdir), 'output.txt'
def test_file_reader(tmpdir):
fs, filename, services = txt_tester.get_services_for_reader(tmpdir)
writer = FileWriter(path=filename)
with writer.open(fs) as fp:
fp.write('Yosh!')
with fs.open(filename) as fp:
assert fp.read() == 'Yosh!'
def test_file_reader_in_context(tmpdir):
fs, filename = open_fs(tmpdir), 'input.txt'
with fs.open(filename, 'w') as fp:
fp.write('Hello\nWorld\n')
reader = FileReader(path=filename)
context = CapturingNodeExecutionContext(reader, services={'fs': fs})
context.start()
context.write(BEGIN, Bag(), END)
context.step()
context.stop()
with CapturingNodeExecutionContext(FileReader(path=filename), services=services) as context:
context.write(BEGIN, Bag(), END)
context.step()
assert len(context.send.mock_calls) == 2

View File

@ -1,44 +1,48 @@
import pytest
from bonobo import Bag, JsonReader, JsonWriter, open_fs, settings
from bonobo import Bag, JsonReader, JsonWriter, settings
from bonobo.constants import BEGIN, END
from bonobo.execution.node import NodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext, FilesystemTester
json_tester = FilesystemTester('json')
json_tester.input_data = '''[{"x": "foo"},{"x": "bar"}]'''
def test_write_json_to_file(tmpdir):
fs, filename = open_fs(tmpdir), 'output.json'
def test_write_json_arg0(tmpdir):
fs, filename, services = json_tester.get_services_for_writer(tmpdir)
writer = JsonWriter(filename, ioformat=settings.IOFORMAT_ARG0)
context = NodeExecutionContext(writer, services={'fs': fs})
context.start()
context.write(BEGIN, Bag({'foo': 'bar'}), END)
context.step()
context.stop()
with NodeExecutionContext(JsonWriter(filename, ioformat=settings.IOFORMAT_ARG0), services=services) as context:
context.write(BEGIN, Bag({'foo': 'bar'}), END)
context.step()
with fs.open(filename) as fp:
assert fp.read() == '[{"foo": "bar"}]'
with pytest.raises(AttributeError):
getattr(context, 'file')
with pytest.raises(AttributeError):
getattr(context, 'first')
@pytest.mark.parametrize('add_kwargs', ({}, {
'ioformat': settings.IOFORMAT_KWARGS,
}, ))
def test_write_json_kwargs(tmpdir, add_kwargs):
fs, filename, services = json_tester.get_services_for_writer(tmpdir)
with NodeExecutionContext(JsonWriter(filename, **add_kwargs), services=services) as context:
context.write(BEGIN, Bag(**{'foo': 'bar'}), END)
context.step()
with fs.open(filename) as fp:
assert fp.read() == '[{"foo": "bar"}]'
def test_read_json_from_file(tmpdir):
fs, filename = open_fs(tmpdir), 'input.json'
with fs.open(filename, 'w') as fp:
fp.write('[{"x": "foo"},{"x": "bar"}]')
reader = JsonReader(filename, ioformat=settings.IOFORMAT_ARG0)
def test_read_json_arg0(tmpdir):
fs, filename, services = json_tester.get_services_for_reader(tmpdir)
context = CapturingNodeExecutionContext(reader, services={'fs': fs})
context.start()
context.write(BEGIN, Bag(), END)
context.step()
context.stop()
with CapturingNodeExecutionContext(
JsonReader(filename, ioformat=settings.IOFORMAT_ARG0),
services=services,
) as context:
context.write(BEGIN, Bag(), END)
context.step()
assert len(context.send.mock_calls) == 2

View File

@ -2,24 +2,22 @@ import pickle
import pytest
from bonobo import Bag, PickleReader, PickleWriter, open_fs, settings
from bonobo import Bag, PickleReader, PickleWriter, settings
from bonobo.constants import BEGIN, END
from bonobo.execution.node import NodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext, FilesystemTester
pickle_tester = FilesystemTester('pkl', mode='wb')
pickle_tester.input_data = pickle.dumps([['a', 'b', 'c'], ['a foo', 'b foo', 'c foo'], ['a bar', 'b bar', 'c bar']])
def test_write_pickled_dict_to_file(tmpdir):
fs, filename = open_fs(tmpdir), 'output.pkl'
fs, filename, services = pickle_tester.get_services_for_writer(tmpdir)
writer = PickleWriter(filename, ioformat=settings.IOFORMAT_ARG0)
context = NodeExecutionContext(writer, services={'fs': fs})
context.write(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END)
context.start()
context.step()
context.step()
context.stop()
with NodeExecutionContext(PickleWriter(filename, ioformat=settings.IOFORMAT_ARG0), services=services) as context:
context.write(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END)
context.step()
context.step()
with fs.open(filename, 'rb') as fp:
assert pickle.loads(fp.read()) == {'foo': 'bar'}
@ -29,18 +27,13 @@ def test_write_pickled_dict_to_file(tmpdir):
def test_read_pickled_list_from_file(tmpdir):
fs, filename = open_fs(tmpdir), 'input.pkl'
with fs.open(filename, 'wb') as fp:
fp.write(pickle.dumps([['a', 'b', 'c'], ['a foo', 'b foo', 'c foo'], ['a bar', 'b bar', 'c bar']]))
fs, filename, services = pickle_tester.get_services_for_reader(tmpdir)
reader = PickleReader(filename, ioformat=settings.IOFORMAT_ARG0)
context = CapturingNodeExecutionContext(reader, services={'fs': fs})
context.start()
context.write(BEGIN, Bag(), END)
context.step()
context.stop()
with CapturingNodeExecutionContext(
PickleReader(filename, ioformat=settings.IOFORMAT_ARG0), services=services
) as context:
context.write(BEGIN, Bag(), END)
context.step()
assert len(context.send.mock_calls) == 2