Skip to content

Commit ff2cbb8

Browse files
committed
feat: Add arrow serialization helpers
1 parent fd93687 commit ff2cbb8

File tree

5 files changed

+69
-1
lines changed

5 files changed

+69
-1
lines changed

cloudquery/plugin_v3/arrow.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from pyarrow import ipc
2+
import pyarrow as pa
3+
from typing import List
4+
5+
def schema_to_bytes(sc) -> bytes:
6+
"""
7+
Convert a schema to bytes
8+
"""
9+
sink = pa.BufferOutputStream()
10+
try:
11+
with pa.ipc.new_stream(sink, sc) as writer:
12+
pass
13+
return sink.getvalue().to_pybytes()
14+
finally:
15+
sink.close()
16+
17+
def new_schema_from_bytes(buf: bytes) -> pa.Schema:
18+
"""
19+
Convert bytes to schema
20+
"""
21+
schema = None
22+
with pa.ipc.open_stream(buf) as reader:
23+
schema = reader.schema
24+
return schema
25+
26+
def schemas_to_bytes(schemas: List[pa.Schema]):
27+
res : List[bytes] = []
28+
for schema in schemas:
29+
res.append(schema_to_bytes(schema))
30+
return res
31+
32+
def new_schemas_from_bytes(bufs: List[bytes]) -> List[pa.Schema]:
33+
res : List[pa.Schema] = []
34+
for buf in bufs:
35+
res.append(new_schema_from_bytes(buf))
36+
return res
37+
38+
def record_to_bytes(rec: pa.RecordBatch) -> bytes:
39+
sink = pa.BufferOutputStream()
40+
try:
41+
with pa.ipc.new_stream(sink, rec.schema) as writer:
42+
writer.write_batch(rec)
43+
return sink.getvalue().to_pybytes()
44+
finally:
45+
sink.close()
46+
47+
def new_record_from_bytes(buf :bytes) -> pa.RecordBatch:
48+
rec = None
49+
with pa.ipc.open_stream(buf) as reader:
50+
rec = reader.read_next_batch()
51+
return rec

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"grpcio >= 1.56.0",
1515
"grpcio-tools >= 1.56.0",
1616
"protobuf >= 4.23.4",
17+
"pyarrow >= 12.0.1"
1718
]
1819
url = "https://github.com/cloudquery/plugin-pb-python"
1920

@@ -36,7 +37,7 @@
3637
]
3738
setuptools.setup(
3839
name=name,
39-
version="0.0.12",
40+
version="0.0.13",
4041
description=description,
4142
long_description=long_description,
4243
author="CloudQuery LTD",

tests/__init__.py

Whitespace-only changes.

tests/plugin_v3/__init__.py

Whitespace-only changes.

tests/plugin_v3/arrow.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import pyarrow as pa
2+
from cloudquery.plugin_v3.arrow import schemas_to_bytes, new_schemas_from_bytes, record_to_bytes, new_record_from_bytes
3+
4+
def test_schema_round_trip():
5+
sc = pa.schema(fields=[pa.field("a", pa.int64())], metadata={"foo":"bar", "baz":"quux"})
6+
b = schemas_to_bytes([sc])
7+
schemas = new_schemas_from_bytes(b)
8+
assert len(schemas) == 1
9+
assert schemas[0].equals(sc)
10+
11+
def test_record_round_trip():
12+
sc = pa.schema(fields=[pa.field("a", pa.int64())], metadata={"foo":"bar", "baz":"quux"})
13+
rec = pa.RecordBatch.from_arrays([pa.array([1,2,3])], schema=sc)
14+
b = record_to_bytes(rec)
15+
rec2 = new_record_from_bytes(b)
16+
assert rec.equals(rec2)

0 commit comments

Comments
 (0)