Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Add support for messagepack #734

Merged
merged 4 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
import random

import json
import struct
import datetime
import socket
import msgpack
import requests
import requests.exceptions
from six.moves import xrange
Expand Down Expand Up @@ -128,7 +131,7 @@ def __init__(self,

self._headers = {
'Content-Type': 'application/json',
'Accept': 'text/plain'
'Accept': 'application/x-msgpack'
}

@property
Expand Down Expand Up @@ -277,13 +280,22 @@ def request(self, url, method='GET', params=None, data=None,
time.sleep((2 ** _try) * random.random() / 100.0)
if not retry:
raise

def reformat_error(response):
err = self._parse_msgpack(response)
if err:
return json.dumps(err, separators=(',', ':'))
else:
return response.content

# if there's not an error, there must have been a successful response
if 500 <= response.status_code < 600:
raise InfluxDBServerError(response.content)
raise InfluxDBServerError(reformat_error(response))
elif response.status_code == expected_response_code:
return response
else:
raise InfluxDBClientError(response.content, response.status_code)
err_msg = reformat_error(response)
raise InfluxDBClientError(err_msg, response.status_code)

def write(self, data, params=None, expected_response_code=204,
protocol='json'):
Expand Down Expand Up @@ -342,6 +354,21 @@ def _read_chunked_response(response, raise_errors=True):
_key, []).extend(result[_key])
return ResultSet(result_set, raise_errors=raise_errors)

@staticmethod
def _parse_msgpack(response):
"""Return the decoded response if it is encoded as msgpack."""
def hook(code, data):
if code == 5:
(epoch_s, epoch_ns) = struct.unpack(">QI", data)
time = datetime.datetime.utcfromtimestamp(epoch_s)
time += datetime.timedelta(microseconds=(epoch_ns / 1000))
return time.isoformat() + 'Z'
return msgpack.ExtType(code, data)

headers = response.headers
if headers and headers["Content-Type"] == "application/x-msgpack":
return msgpack.unpackb(response.content, ext_hook=hook, raw=False)

def query(self,
query,
params=None,
Expand Down Expand Up @@ -434,10 +461,11 @@ def query(self,
expected_response_code=expected_response_code
)

if chunked:
return self._read_chunked_response(response)

data = response.json()
data = self._parse_msgpack(response)
if not data:
if chunked:
return self._read_chunked_response(response)
data = response.json()

results = [
ResultSet(result, raise_errors=raise_errors)
Expand Down
23 changes: 23 additions & 0 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,29 @@ def test_query(self):
[{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}]
)

def test_query_msgpack(self):
"""Test query method with a messagepack response."""
example_response = bytes(bytearray.fromhex(
"81a7726573756c74739182ac73746174656d656e745f696400a673657269"
"65739183a46e616d65a161a7636f6c756d6e7392a474696d65a176a67661"
"6c7565739192c70c05000000005d26178a019096c8cb3ff0000000000000"
))

with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
request_headers={"Accept": "application/x-msgpack"},
headers={"Content-Type": "application/x-msgpack"},
content=example_response
)
rs = self.cli.query('select * from a')

self.assertListEqual(
list(rs.get_points()),
[{'v': 1.0, 'time': '2019-07-10T16:51:22.026253Z'}]
)

def test_select_into_post(self):
"""Test SELECT.*INTO is POSTed."""
example_response = (
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ python-dateutil>=2.6.0
pytz
requests>=2.17.0
six>=1.10.0
msgpack==0.6.1