Support line-delimited JSON
New nodes for handling line-delimited JSON. https://en.wikipedia.org/wiki/JSON_Streaming
This commit is contained in:
@ -2,6 +2,7 @@ import logging
|
||||
|
||||
from bonobo.nodes import CsvReader, CsvWriter, FileReader, FileWriter, Filter, JsonReader, JsonWriter, Limit, \
|
||||
PickleReader, PickleWriter, PrettyPrinter, RateLimited, Tee, arg0_to_kwargs, count, identity, kwargs_to_arg0, noop
|
||||
from bonobo.nodes import LdjsonReader, LdjsonWriter
|
||||
from bonobo.strategies import create_strategy
|
||||
from bonobo.structs import Bag, ErrorBag, Graph, Token
|
||||
from bonobo.util import get_name
|
||||
@ -110,6 +111,8 @@ register_api_group(
|
||||
Filter,
|
||||
JsonReader,
|
||||
JsonWriter,
|
||||
LdjsonReader,
|
||||
LdjsonWriter,
|
||||
Limit,
|
||||
PickleReader,
|
||||
PickleWriter,
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
""" Readers and writers for common file formats. """
|
||||
|
||||
from .file import FileReader, FileWriter
|
||||
from .json import JsonReader, JsonWriter
|
||||
from .json import JsonReader, JsonWriter, LdjsonReader, LdjsonWriter
|
||||
from .csv import CsvReader, CsvWriter
|
||||
from .pickle import PickleReader, PickleWriter
|
||||
|
||||
@ -12,6 +12,8 @@ __all__ = [
|
||||
'FileWriter',
|
||||
'JsonReader',
|
||||
'JsonWriter',
|
||||
'LdjsonReader',
|
||||
'LdjsonWriter',
|
||||
'PickleReader',
|
||||
'PickleWriter',
|
||||
]
|
||||
|
||||
@ -45,3 +45,21 @@ class JsonWriter(FileWriter, JsonHandler):
|
||||
self._write_line(file, (self.eol if lineno.value else '') + json.dumps(row))
|
||||
lineno += 1
|
||||
return NOT_MODIFIED
|
||||
|
||||
|
||||
class LdjsonReader(FileReader):
|
||||
"""Read a stream of JSON objects, one object per line."""
|
||||
loader = staticmethod(json.loads)
|
||||
|
||||
def read(self, fs, file):
|
||||
for line in file:
|
||||
print(line)
|
||||
yield self.loader(line)
|
||||
|
||||
|
||||
class LdjsonWriter(FileWriter):
|
||||
"""Write a stream of JSON objects, one object per line."""
|
||||
def write(self, fs, file, lineno, **row):
|
||||
lineno += 1 # class-level variable
|
||||
file.write(json.dumps(row) + '\n')
|
||||
return NOT_MODIFIED
|
||||
|
||||
@ -12,6 +12,11 @@ Command line
|
||||
|
||||
* `bonobo download /examples/datasets/coffeeshops.txt` now downloads the coffeeshops example
|
||||
|
||||
Graphs and Nodes
|
||||
................
|
||||
|
||||
* New `LdjsonReader` and `LdjsonWriter` nodes for handling `line-delimited JSON <https://en.wikipedia.org/wiki/JSON_Streaming>`_.
|
||||
|
||||
v.0.5.0 - 5 october 2017
|
||||
::::::::::::::::::::::::
|
||||
|
||||
|
||||
@ -1,8 +1,9 @@
|
||||
import pytest
|
||||
|
||||
from bonobo import JsonReader, JsonWriter, settings
|
||||
from bonobo import LdjsonReader, LdjsonWriter
|
||||
from bonobo.execution.node import NodeExecutionContext
|
||||
from bonobo.util.testing import FilesystemTester
|
||||
from bonobo.util.testing import FilesystemTester, BufferingNodeExecutionContext
|
||||
|
||||
json_tester = FilesystemTester('json')
|
||||
json_tester.input_data = '''[{"x": "foo"},{"x": "bar"}]'''
|
||||
@ -32,3 +33,32 @@ def test_write_json_kwargs(tmpdir, add_kwargs):
|
||||
|
||||
with fs.open(filename) as fp:
|
||||
assert fp.read() == '[{"foo": "bar"}]'
|
||||
|
||||
|
||||
stream_json_tester = FilesystemTester('json')
|
||||
stream_json_tester.input_data = '''{"foo": "bar"}\n{"baz": "boz"}'''
|
||||
|
||||
|
||||
def test_read_stream_json(tmpdir):
|
||||
fs, filename, services = stream_json_tester.get_services_for_reader(tmpdir)
|
||||
with BufferingNodeExecutionContext(LdjsonReader(filename),
|
||||
services=services) as context:
|
||||
context.write_sync(tuple())
|
||||
actual = context.get_buffer()
|
||||
|
||||
expected = [{"foo": "bar"}, {"baz": "boz"}]
|
||||
assert expected == actual
|
||||
|
||||
|
||||
def test_write_stream_json(tmpdir):
|
||||
fs, filename, services = stream_json_tester.get_services_for_reader(tmpdir)
|
||||
|
||||
with BufferingNodeExecutionContext(LdjsonWriter(filename),
|
||||
services=services) as context:
|
||||
context.write_sync({'foo': 'bar'})
|
||||
context.write_sync({'baz': 'boz'})
|
||||
|
||||
expected = '''{"foo": "bar"}\n{"baz": "boz"}\n'''
|
||||
with fs.open(filename) as fin:
|
||||
actual = fin.read()
|
||||
assert expected == actual
|
||||
|
||||
Reference in New Issue
Block a user