Skip to content

Add optional Arrow deserialization support #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/guide/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ When using the `ignore_status` parameter the error response will be returned ser
[[serializer]]
=== Serializers

Serializers transform bytes on the wire into native Python objects and vice-versa. By default the client ships with serializers for `application/json`, `application/x-ndjson`, `text/*`, and `application/mapbox-vector-tile`.
Serializers transform bytes on the wire into native Python objects and vice-versa. By default the client ships with serializers for `application/json`, `application/x-ndjson`, `text/*`, `application/vnd.apache.arrow.stream` and `application/mapbox-vector-tile`.

You can define custom serializers via the `serializers` parameter:

Expand Down
34 changes: 34 additions & 0 deletions elasticsearch_serverless/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@
except ImportError:
_OrjsonSerializer = None # type: ignore[assignment,misc]

try:
import pyarrow as pa

__all__.append("PyArrowSerializer")
except ImportError:
pa = None


class JsonSerializer(_JsonSerializer):
mimetype: ClassVar[str] = "application/json"
Expand Down Expand Up @@ -114,6 +121,29 @@ def dumps(self, data: bytes) -> bytes:
raise SerializationError(f"Cannot serialize {data!r} into a MapBox vector tile")


if pa is not None:

class PyArrowSerializer(Serializer):
"""PyArrow serializer for deserializing Arrow Stream data."""

mimetype: ClassVar[str] = "application/vnd.apache.arrow.stream"

def loads(self, data: bytes) -> pa.Table:
try:
with pa.ipc.open_stream(data) as reader:
return reader.read_all()
except pa.ArrowException as e:
raise SerializationError(
message=f"Unable to deserialize as Arrow stream: {data!r}",
errors=(e,),
)

def dumps(self, data: Any) -> bytes:
raise SerializationError(
message="Elasticsearch does not accept Arrow input data"
)


DEFAULT_SERIALIZERS: Dict[str, Serializer] = {
JsonSerializer.mimetype: JsonSerializer(),
MapboxVectorTileSerializer.mimetype: MapboxVectorTileSerializer(),
Expand All @@ -122,6 +152,10 @@ def dumps(self, data: bytes) -> bytes:
CompatibilityModeNdjsonSerializer.mimetype: CompatibilityModeNdjsonSerializer(),
}

if pa is not None:
DEFAULT_SERIALIZERS[PyArrowSerializer.mimetype] = PyArrowSerializer()


# Alias for backwards compatibility
JSONSerializer = JsonSerializer

Expand Down
3 changes: 1 addition & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ def lint(session):
session.run("flake8", *SOURCE_FILES)
session.run("python", "utils/license-headers.py", "check", *SOURCE_FILES)

# Workaround to make '-r' to still work despite uninstalling aiohttp below.
session.install(".[async,requests,orjson]", env=INSTALL_ENV)
session.install(".[async,requests,orjson,pyarrow]", env=INSTALL_ENV)

# Run mypy on the package and then the type examples separately for
# the two different mypy use-cases, ourselves and our users.
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dependencies = [
async = ["aiohttp>=3,<4"]
requests = ["requests>=2.4.0, <3.0.0" ]
orjson = ["orjson>=3"]
pyarrow = ["pyarrow>=1"]
dev = [
"requests>=2, <3",
"aiohttp",
Expand All @@ -65,6 +66,7 @@ dev = [
"nox",
"orjson",
"numpy",
"pyarrow",
"pandas",
"mapbox-vector-tile",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class CustomSerializer(JsonSerializer):
"application/x-ndjson",
"application/json",
"text/*",
"application/vnd.apache.arrow.stream",
"application/vnd.elasticsearch+json",
"application/vnd.elasticsearch+x-ndjson",
}
Expand All @@ -93,6 +94,7 @@ class CustomSerializer(JsonSerializer):
"application/x-ndjson",
"application/json",
"text/*",
"application/vnd.apache.arrow.stream",
"application/vnd.elasticsearch+json",
"application/vnd.elasticsearch+x-ndjson",
"application/cbor",
Expand Down
3 changes: 3 additions & 0 deletions test_elasticsearch_serverless/test_client/test_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class CustomSerializer:
"application/json",
"text/*",
"application/x-ndjson",
"application/vnd.apache.arrow.stream",
"application/vnd.mapbox-vector-tile",
"application/vnd.elasticsearch+json",
"application/vnd.elasticsearch+x-ndjson",
Expand Down Expand Up @@ -98,6 +99,7 @@ class CustomSerializer:
"application/json",
"text/*",
"application/x-ndjson",
"application/vnd.apache.arrow.stream",
"application/vnd.mapbox-vector-tile",
"application/vnd.elasticsearch+json",
"application/vnd.elasticsearch+x-ndjson",
Expand All @@ -117,6 +119,7 @@ class CustomSerializer:
"application/json",
"text/*",
"application/x-ndjson",
"application/vnd.apache.arrow.stream",
"application/vnd.mapbox-vector-tile",
"application/vnd.elasticsearch+json",
"application/vnd.elasticsearch+x-ndjson",
Expand Down
21 changes: 21 additions & 0 deletions test_elasticsearch_serverless/test_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from datetime import datetime
from decimal import Decimal

import pyarrow as pa
import pytest

try:
Expand All @@ -35,6 +36,7 @@
from elasticsearch_serverless.serializer import (
JSONSerializer,
OrjsonSerializer,
PyArrowSerializer,
TextSerializer,
)

Expand Down Expand Up @@ -161,6 +163,25 @@ def test_serializes_pandas_category(json_serializer):
assert b'{"d":[1,2,3]}' == json_serializer.dumps({"d": cat})


def test_pyarrow_loads():
data = [
pa.array([1, 2, 3, 4]),
pa.array(["foo", "bar", "baz", None]),
pa.array([True, None, False, True]),
]
batch = pa.record_batch(data, names=["f0", "f1", "f2"])
sink = pa.BufferOutputStream()
with pa.ipc.new_stream(sink, batch.schema) as writer:
writer.write_batch(batch)

serializer = PyArrowSerializer()
assert serializer.loads(sink.getvalue()).to_pydict() == {
"f0": [1, 2, 3, 4],
"f1": ["foo", "bar", "baz", None],
"f2": [True, None, False, True],
}


def test_json_raises_serialization_error_on_dump_error(json_serializer):
with pytest.raises(SerializationError):
json_serializer.dumps(object())
Expand Down
Loading