Skip to content

Commit 5ec52e0

Browse files
committed
feat: jsongz export/import
1 parent ecf1dd6 commit 5ec52e0

File tree

2 files changed

+123
-8
lines changed

2 files changed

+123
-8
lines changed

rethinkdb/_export.py

100755100644
Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import tempfile
3434
import time
3535
import traceback
36+
import zlib
3637
from multiprocessing.queues import SimpleQueue
3738

3839
import six
@@ -48,7 +49,7 @@
4849

4950
usage = """rethinkdb export [-c HOST:PORT] [-p] [--password-file FILENAME] [--tls-cert filename] [-d DIR]
5051
[-e (DB | DB.TABLE)]...
51-
[--format (csv | json | ndjson)] [--fields FIELD,FIELD...] [--delimiter CHARACTER]
52+
[--format (csv | json | ndjson | jsongz)] [--fields FIELD,FIELD...] [--delimiter CHARACTER]
5253
[--clients NUM]"""
5354
help_description = (
5455
"`rethinkdb export` exports data from a RethinkDB cluster into a directory"
@@ -118,11 +119,11 @@ def parse_options(argv, prog=None):
118119
parser.add_option(
119120
"--format",
120121
dest="format",
121-
metavar="json|csv|ndjson",
122+
metavar="json|csv|ndjson|jsongz",
122123
default="json",
123124
help="format to write (defaults to json. ndjson is newline delimited json.)",
124125
type="choice",
125-
choices=["json", "csv", "ndjson"],
126+
choices=["json", "csv", "ndjson", "jsongz"],
126127
)
127128
parser.add_option(
128129
"--clients",
@@ -150,6 +151,17 @@ def parse_options(argv, prog=None):
150151
)
151152
parser.add_option_group(csvGroup)
152153

154+
jsongzGroup = optparse.OptionGroup(parser, "jsongz options")
155+
jsongzGroup.add_option(
156+
"--compression-level",
157+
dest="compression_level",
158+
metavar="NUM",
159+
default=None,
160+
help="compression level, an integer from 0 to 9 (defaults to -1 default zlib compression)",
161+
type="int",
162+
)
163+
parser.add_option_group(jsongzGroup)
164+
153165
options, args = parser.parse_args(argv)
154166

155167
# -- Check validity of arguments
@@ -185,6 +197,15 @@ def parse_options(argv, prog=None):
185197
if options.delimiter:
186198
parser.error("--delimiter option is only valid for CSV file formats")
187199

200+
if options.format == "jsongz":
201+
if options.compression_level is None:
202+
options.compression_level = -1
203+
elif options.compression_level < 0 or options.compression_level > 9:
204+
parser.error("--compression-level must be an integer from 0 and 9")
205+
else:
206+
if options.compression_level:
207+
parser.error("--compression-level option is only valid for jsongz file formats")
208+
188209
# -
189210

190211
return options
@@ -226,6 +247,43 @@ def json_writer(filename, fields, task_queue, error_queue, format):
226247
pass
227248

228249

250+
def json_gz_writer(filename, fields, task_queue, error_queue, format, compression_level):
251+
try:
252+
with open(filename, "wb") as out:
253+
# wbits 31 = MAX_WBITS + gzip header and trailer
254+
compressor = zlib.compressobj(compression_level, zlib.DEFLATED, 31)
255+
def compress_write(str):
256+
out.write(compressor.compress(str.encode("utf-8")))
257+
258+
first = True
259+
compress_write("[")
260+
item = task_queue.get()
261+
while not isinstance(item, StopIteration):
262+
row = item[0]
263+
if fields is not None:
264+
for item in list(row.keys()):
265+
if item not in fields:
266+
del row[item]
267+
if first:
268+
compress_write("\n")
269+
first = False
270+
else:
271+
compress_write(",\n")
272+
273+
compress_write(json.dumps(row))
274+
item = task_queue.get()
275+
276+
compress_write("\n]\n")
277+
out.write(compressor.flush())
278+
except BaseException:
279+
ex_type, ex_class, tb = sys.exc_info()
280+
error_queue.put((ex_type, ex_class, traceback.extract_tb(tb)))
281+
282+
# Read until the exit task so the readers do not hang on pushing onto the queue
283+
while not isinstance(task_queue.get(), StopIteration):
284+
pass
285+
286+
229287
def csv_writer(filename, fields, delimiter, task_queue, error_queue):
230288
try:
231289
with open(filename, "w") as out:
@@ -331,6 +389,19 @@ def export_table(
331389
options.format,
332390
),
333391
)
392+
elif options.format == "jsongz":
393+
filename = directory + "/%s/%s.jsongz" % (db, table)
394+
writer = multiprocessing.Process(
395+
target=json_gz_writer,
396+
args=(
397+
filename,
398+
options.fields,
399+
task_queue,
400+
error_queue,
401+
options.format,
402+
options.compression_level,
403+
),
404+
)
334405
elif options.format == "csv":
335406
filename = directory + "/%s/%s.csv" % (db, table)
336407
writer = multiprocessing.Process(

rethinkdb/_import.py

100755100644
Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
import optparse
3131
import os
3232
import signal
33+
import struct
3334
import sys
3435
import time
3536
import traceback
37+
import zlib
3638
from multiprocessing.queues import Queue, SimpleQueue
3739

3840
import six
@@ -56,6 +58,9 @@
5658
JSON_MAX_BUFFER_SIZE = 128 * 1024 * 1024
5759
MAX_NESTING_DEPTH = 100
5860

61+
# jsongz parameters
62+
JSON_GZ_READ_CHUNK_SIZE = 16 * 1024
63+
5964
Error = collections.namedtuple("Error", ["message", "traceback", "file"])
6065

6166

@@ -133,7 +138,10 @@ def __init__(
133138
self._source = source
134139
else:
135140
try:
136-
self._source = codecs.open(source, mode="r", encoding="utf-8")
141+
if self.format == "jsongz":
142+
self._source = open(source, mode="rb")
143+
else:
144+
self._source = codecs.open(source, mode="r", encoding="utf-8")
137145
except IOError as exc:
138146
default_logger.exception(exc)
139147
raise ValueError(
@@ -145,9 +153,16 @@ def __init__(
145153
and self._source.name
146154
and os.path.isfile(self._source.name)
147155
):
148-
self._bytes_size.value = os.path.getsize(source)
156+
self._bytes_size.value = os.path.getsize(self._source.name)
149157
if self._bytes_size.value == 0:
150-
raise ValueError("Source is zero-length: %s" % source)
158+
raise ValueError("Source is zero-length: %s" % self._source.name)
159+
160+
# get uncompressed file length from gzip trailer (last 4 bytes)
161+
if self.format == "jsongz":
162+
# TODO: check valid gzip
163+
self._source.seek(-4, 2)
164+
self._bytes_size.value = struct.unpack("I", self._source.read(4))[0]
165+
self._source.seek(0)
151166

152167
# table info
153168
self.db = db
@@ -500,6 +515,9 @@ class JsonSourceFile(SourceFile):
500515
_buffer_pos = None
501516
_buffer_end = None
502517

518+
def read_chunk(self, max_length):
519+
return self._source.read(max_length)
520+
503521
def fill_buffer(self):
504522
if self._buffer_str is None:
505523
self._buffer_str = ""
@@ -520,7 +538,7 @@ def fill_buffer(self):
520538
if read_target < 1:
521539
raise AssertionError("Can not set the read target and full the buffer")
522540

523-
new_chunk = self._source.read(read_target)
541+
new_chunk = self.read_chunk(read_target)
524542

525543
if len(new_chunk) == 0:
526544
raise StopIteration() # file ended
@@ -634,6 +652,28 @@ def teardown(self):
634652
)
635653

636654

655+
class JsonGzSourceFile(JsonSourceFile):
656+
format = "jsongz"
657+
658+
def __init__(self, *args, **kwargs):
659+
660+
# initialize zlib decompressor
661+
# wbits 31 = window size MAX_WBITS & expects gzip header and trailer
662+
self._decompressor = zlib.decompressobj(31)
663+
664+
super(JsonGzSourceFile, self).__init__(*args, **kwargs)
665+
666+
def read_chunk(self, max_length):
667+
chunk = b''
668+
while len(chunk) < max_length:
669+
compressed_buf = self._decompressor.unconsumed_tail + self._source.read(JSON_GZ_READ_CHUNK_SIZE)
670+
if len(compressed_buf) == 0:
671+
break
672+
decompressed_buf = self._decompressor.decompress(compressed_buf, max_length - len(chunk))
673+
chunk += decompressed_buf
674+
return chunk.decode("utf-8")
675+
676+
637677
class CsvSourceFile(SourceFile):
638678
format = "csv"
639679

@@ -1552,6 +1592,8 @@ def parse_info_file(path):
15521592
table_type_options = None
15531593
if ext == ".json":
15541594
table_type = JsonSourceFile
1595+
elif ext == ".jsongz":
1596+
table_type = JsonGzSourceFile
15551597
elif ext == ".csv":
15561598
table_type = CsvSourceFile
15571599
table_type_options = {
@@ -1622,7 +1664,7 @@ def parse_info_file(path):
16221664
table, ext = os.path.splitext(filename)
16231665
table = os.path.basename(table)
16241666

1625-
if ext not in [".json", ".csv", ".info"]:
1667+
if ext not in [".json", ".jsongz", ".csv", ".info"]:
16261668
files_ignored.append(os.path.join(root, filename))
16271669
elif ext == ".info":
16281670
pass # Info files are included based on the data files
@@ -1657,6 +1699,8 @@ def parse_info_file(path):
16571699
table_type = None
16581700
if ext == ".json":
16591701
table_type = JsonSourceFile
1702+
elif ext == ".jsongz":
1703+
table_type = JsonGzSourceFile
16601704
elif ext == ".csv":
16611705
table_type = CsvSourceFile
16621706
else:

0 commit comments

Comments
 (0)