diff --git a/.travis.yml b/.travis.yml index 9d45f19b..6031d1cb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -36,7 +36,7 @@ matrix: install: - pip install tox-travis - - pip install setuptools + - pip install setuptools==30.0.0 - pip install coveralls - mkdir -p "influxdb_install/${INFLUXDB_VER}" - if [ -n "${INFLUXDB_VER}" ] ; then wget "https://dl.influxdata.com/influxdb/releases/influxdb_${INFLUXDB_VER}_amd64.deb" ; fi diff --git a/README.rst b/README.rst index a40ed148..4002ed03 100644 --- a/README.rst +++ b/README.rst @@ -71,6 +71,7 @@ Main dependency is: Additional dependencies are: - pandas: for writing from and reading to DataFrames (http://pandas.pydata.org/) +- pandas: for ns resolution of timepoints - Sphinx: Tool to create and manage the documentation (http://sphinx-doc.org/) - Nose: to auto-discover tests (http://nose.readthedocs.org/en/latest/) - Mock: to mock tests (https://pypi.python.org/pypi/mock) diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index d16e29ca..dc4484e9 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -15,6 +15,27 @@ from .client import InfluxDBClient from .line_protocol import _escape_tag +# Precisions factors must be int for correct calculation to ints. +# if precision is a float the result of a floor calc is an approximation +# Example : the issue is only observable with nanosecond resolution +# values are greater than 895ns +# ts = pd.Timestamp('2013-01-01 23:10:55.123456987+00:00') +# ts_ns = np.int64(ts.value) +# # For conversion to microsecond +# precision_factor=1e3 +# expected_ts_us = 1357081855123456 +# # following is INCORRECT 1357081855123457 +# np.int64(ts_ns // precision_factor) +# # following is CORRECT 1357081855123456 +# np.int64(ts_ns // np.int64(precision_factor) + +_time_precision_factors = {"n": 1, + "u": np.int64(1e3), + "ms": np.int64(1e6), + "s": np.int64(1e9), + "m": np.int64(1e9 * 60), + "h": np.int64(1e9 * 3600), } + def _pandas_time_unit(time_precision): unit = time_precision @@ -261,20 +282,13 @@ def _convert_dataframe_to_json(dataframe, # Convert dtype for json serialization dataframe = dataframe.astype('object') - precision_factor = { - "n": 1, - "u": 1e3, - "ms": 1e6, - "s": 1e9, - "m": 1e9 * 60, - "h": 1e9 * 3600, - }.get(time_precision, 1) + precision_factor = _time_precision_factors.get(time_precision, 1) points = [ {'measurement': measurement, 'tags': dict(list(tag.items()) + list(tags.items())), 'fields': rec, - 'time': np.int64(ts.value / precision_factor)} + 'time': np.int64(ts.value // precision_factor)} for ts, tag, rec in zip(dataframe.index, dataframe[tag_columns].to_dict('record'), dataframe[field_columns].to_dict('record')) @@ -331,21 +345,14 @@ def _convert_dataframe_to_lines(self, field_columns = list(column_series[~column_series.isin( tag_columns)]) - precision_factor = { - "n": 1, - "u": 1e3, - "ms": 1e6, - "s": 1e9, - "m": 1e9 * 60, - "h": 1e9 * 3600, - }.get(time_precision, 1) + precision_factor = _time_precision_factors.get(time_precision, 1) # Make array of timestamp ints if isinstance(dataframe.index, pd.PeriodIndex): - time = ((dataframe.index.to_timestamp().values.astype(np.int64) / + time = ((dataframe.index.to_timestamp().values.astype(np.int64) // precision_factor).astype(np.int64).astype(str)) else: - time = ((pd.to_datetime(dataframe.index).values.astype(np.int64) / + time = ((pd.to_datetime(dataframe.index).values.astype(np.int64) // precision_factor).astype(np.int64).astype(str)) # If tag columns exist, make an array of formatted tag keys and values @@ -454,16 +461,6 @@ def _stringify_dataframe(dframe, numeric_precision, datatype='field'): return dframe def _datetime_to_epoch(self, datetime, time_precision='s'): - seconds = (datetime - self.EPOCH).total_seconds() - if time_precision == 'h': - return seconds / 3600 - elif time_precision == 'm': - return seconds / 60 - elif time_precision == 's': - return seconds - elif time_precision == 'ms': - return seconds * 1e3 - elif time_precision == 'u': - return seconds * 1e6 - elif time_precision == 'n': - return seconds * 1e9 + nanoseconds = (datetime - self.EPOCH).value + precision_factor = _time_precision_factors.get(time_precision, 1) + return np.int64(nanoseconds // np.int64(precision_factor)) diff --git a/influxdb/line_protocol.py b/influxdb/line_protocol.py index ec59ef47..02ad23fd 100644 --- a/influxdb/line_protocol.py +++ b/influxdb/line_protocol.py @@ -8,44 +8,55 @@ from datetime import datetime from numbers import Integral - -from pytz import UTC -from dateutil.parser import parse from six import binary_type, text_type, integer_types, PY2 -EPOCH = UTC.localize(datetime.utcfromtimestamp(0)) +import pandas as pd # Provide for ns timestamps +import numpy as np # Provided for accurate precision_factor conversion + +EPOCH = pd.Timestamp(0, tz='UTC') + +# Precisions factors must be int for correct calculation to ints. +# if precision is float the result of a floor calc is an approximation +# Example : the issue is only observable with nanosecond resolution +# values are greater than 895ns +# ts = pd.Timestamp('2013-01-01 23:10:55.123456987+00:00') +# ts_ns = np.int64(ts.value) +# # For conversion to microsecond +# precision_factor=1e3 +# expected_ts_us = 1357081855123456 +# np.int64(ts_ns // precision_factor) # is INCORRECT 1357081855123457 +# np.int64(ts_ns // np.int64(precision_factor) # is CORRECT 1357081855123456 +_time_precision_factors = {"n": 1, + "u": np.int64(1e3), + "ms": np.int64(1e6), + "s": np.int64(1e9), + "m": np.int64(1e9 * 60), + "h": np.int64(1e9 * 3600), } -def _convert_timestamp(timestamp, precision=None): + +def _convert_timestamp(timestamp, time_precision=None): if isinstance(timestamp, Integral): return timestamp # assume precision is correct if timestamp is int if isinstance(_get_unicode(timestamp), text_type): - timestamp = parse(timestamp) + timestamp = pd.Timestamp(timestamp) - if isinstance(timestamp, datetime): + if isinstance(timestamp, datetime): # change to pandas.Timestamp if not timestamp.tzinfo: - timestamp = UTC.localize(timestamp) - - ns = (timestamp - EPOCH).total_seconds() * 1e9 - if precision is None or precision == 'n': - return ns - - if precision == 'u': - return ns / 1e3 - - if precision == 'ms': - return ns / 1e6 - - if precision == 's': - return ns / 1e9 - - if precision == 'm': - return ns / 1e9 / 60 + timestamp = pd.Timestamp(timestamp, tz='UTC') + else: + timestamp = pd.Timestamp(timestamp) - if precision == 'h': - return ns / 1e9 / 3600 + if isinstance(timestamp, pd._libs.tslib.Timestamp): + if not timestamp.tzinfo: # set to UTC for time since EPOCH + timestamp = pd.Timestamp(timestamp, tz='UTC') + else: + timestamp = timestamp.astimezone('UTC') + nanoseconds = (timestamp - EPOCH).value + precision_factor = _time_precision_factors.get(time_precision, 1) + return np.int64(nanoseconds // np.int64(precision_factor)) raise ValueError(timestamp) diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 99a7f42b..e2e47ee9 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -90,7 +90,7 @@ def setUp(self): "host": "server01", "region": "us-west" }, - "time": "2009-11-10T23:00:00.123456Z", + "time": "2009-11-10 23:10:55.123456987", "fields": { "value": 0.64 } @@ -210,7 +210,7 @@ def test_write_points(self): ) self.assertEqual( 'cpu_load_short,host=server01,region=us-west ' - 'value=0.64 1257894000123456000\n', + 'value=0.64 1257894655123456987\n', m.last_request.body.decode('utf-8'), ) @@ -232,7 +232,7 @@ def test_write_points_toplevel_attributes(self): ) self.assertEqual( 'cpu_load_short,host=server01,region=us-west,tag=hello ' - 'value=0.64 1257894000123456000\n', + 'value=0.64 1257894655123456987\n', m.last_request.body.decode('utf-8'), ) @@ -281,7 +281,7 @@ def test_write_points_udp(self): self.assertEqual( 'cpu_load_short,host=server01,region=us-west ' - 'value=0.64 1257894000123456000\n', + 'value=0.64 1257894655123456987\n', received_data.decode() ) @@ -306,35 +306,35 @@ def test_write_points_with_precision(self): cli.write_points(self.dummy_points, time_precision='n') self.assertEqual( b'cpu_load_short,host=server01,region=us-west ' - b'value=0.64 1257894000123456000\n', + b'value=0.64 1257894655123456987\n', m.last_request.body, ) cli.write_points(self.dummy_points, time_precision='u') self.assertEqual( b'cpu_load_short,host=server01,region=us-west ' - b'value=0.64 1257894000123456\n', + b'value=0.64 1257894655123456\n', m.last_request.body, ) cli.write_points(self.dummy_points, time_precision='ms') self.assertEqual( b'cpu_load_short,host=server01,region=us-west ' - b'value=0.64 1257894000123\n', + b'value=0.64 1257894655123\n', m.last_request.body, ) cli.write_points(self.dummy_points, time_precision='s') self.assertEqual( b"cpu_load_short,host=server01,region=us-west " - b"value=0.64 1257894000\n", + b"value=0.64 1257894655\n", m.last_request.body, ) cli.write_points(self.dummy_points, time_precision='m') self.assertEqual( b'cpu_load_short,host=server01,region=us-west ' - b'value=0.64 20964900\n', + b'value=0.64 20964910\n', m.last_request.body, ) @@ -377,7 +377,7 @@ def test_write_points_with_precision_udp(self): received_data, addr = s.recvfrom(1024) self.assertEqual( b'cpu_load_short,host=server01,region=us-west ' - b'value=0.64 1257894000123456000\n', + b'value=0.64 1257894655123456987\n', received_data, ) @@ -385,7 +385,7 @@ def test_write_points_with_precision_udp(self): received_data, addr = s.recvfrom(1024) self.assertEqual( b'cpu_load_short,host=server01,region=us-west ' - b'value=0.64 1257894000123456\n', + b'value=0.64 1257894655123456\n', received_data, ) @@ -393,7 +393,7 @@ def test_write_points_with_precision_udp(self): received_data, addr = s.recvfrom(1024) self.assertEqual( b'cpu_load_short,host=server01,region=us-west ' - b'value=0.64 1257894000123\n', + b'value=0.64 1257894655123\n', received_data, ) @@ -401,7 +401,7 @@ def test_write_points_with_precision_udp(self): received_data, addr = s.recvfrom(1024) self.assertEqual( b"cpu_load_short,host=server01,region=us-west " - b"value=0.64 1257894000\n", + b"value=0.64 1257894655\n", received_data, ) @@ -409,7 +409,7 @@ def test_write_points_with_precision_udp(self): received_data, addr = s.recvfrom(1024) self.assertEqual( b'cpu_load_short,host=server01,region=us-west ' - b'value=0.64 20964900\n', + b'value=0.64 20964910\n', received_data, ) diff --git a/influxdb/tests/dataframe_client_test.py b/influxdb/tests/dataframe_client_test.py index 90312ed8..7fa801f1 100644 --- a/influxdb/tests/dataframe_client_test.py +++ b/influxdb/tests/dataframe_client_test.py @@ -994,37 +994,55 @@ def test_get_list_database(self): ) def test_datetime_to_epoch(self): - """Test convert datetime to epoch in TestDataFrameClient object.""" - timestamp = pd.Timestamp('2013-01-01 00:00:00.000+00:00') + """Test convert datetime to epoch in TestDataFrameClient object. + + Precisions factors must be int for correct calculation to ints. + if precision is float the result of a floor calc is an approximation + Choosing the test value is important that nanosecond resolution + values are greater than 895ns + + Example : the issue is only observable ns > 895ns + # ts = pd.Timestamp('2013-01-01 23:10:55.123456987+00:00') + # ts_ns = np.int64(ts.value) + # # For conversion to microsecond + # precision_factor=1e3 + # expected_ts_us = 1357081855123456 + # following is INCORRECT 1357081855123457 + # np.int64(ts_ns // precision_factor) + # following is CORRECT 1357081855123456 + # np.int64(ts_ns // np.int64(precision_factor) + + """ + timestamp = pd.Timestamp('2013-01-01 23:10:55.123456987+00:00') cli = DataFrameClient('host', 8086, 'username', 'password', 'db') self.assertEqual( cli._datetime_to_epoch(timestamp), - 1356998400.0 + 1357081855 ) self.assertEqual( cli._datetime_to_epoch(timestamp, time_precision='h'), - 1356998400.0 / 3600 + 1357081855 // 3600 ) self.assertEqual( cli._datetime_to_epoch(timestamp, time_precision='m'), - 1356998400.0 / 60 + 1357081855 // 60 ) self.assertEqual( cli._datetime_to_epoch(timestamp, time_precision='s'), - 1356998400.0 + 1357081855 ) self.assertEqual( cli._datetime_to_epoch(timestamp, time_precision='ms'), - 1356998400000.0 + 1357081855123 ) self.assertEqual( cli._datetime_to_epoch(timestamp, time_precision='u'), - 1356998400000000.0 + 1357081855123456 ) self.assertEqual( cli._datetime_to_epoch(timestamp, time_precision='n'), - 1356998400000000000.0 + 1357081855123456987 ) def test_dsn_constructor(self): diff --git a/influxdb/tests/test_line_protocol.py b/influxdb/tests/test_line_protocol.py index 71828f62..2d03b85f 100644 --- a/influxdb/tests/test_line_protocol.py +++ b/influxdb/tests/test_line_protocol.py @@ -11,6 +11,7 @@ from pytz import UTC, timezone from influxdb import line_protocol +import pandas as pd class TestLineProtocol(unittest.TestCase): @@ -46,33 +47,114 @@ def test_make_lines(self): 'bool_val=True,float_val=1.1,int_val=1i,string_val="hello!"\n' ) + def test_convert_timestamp(self): + """Test line_protocol _convert_timestamp + + Precisions factors must be int for correct calculation to ints. if + precision is float the result of a floor calc is an approximation + Choosing the test value is important that nanosecond resolution values + are greater than 895ns + + Example : the issue is only observable ns > 895ns + # ts = pd.Timestamp('2013-01-01 23:10:55.123456987+00:00') + # ts_ns = np.int64(ts.value) + # # For conversion to microsecond + # precision_factor=1e3 + # expected_ts_us = 1357081855123456 + # # following is INCORRECT 1357081855123457 + # np.int64(ts_ns // precision_factor) + # # following is CORRECT 1357081855123456 + # np.int64(ts_ns // np.int64(precision_factor) + + """ + + # TODO: add tests for timestamp instances + # 1) isinstance(timestamp, Integral) + # 2) isinstance(_get_unicode(timestamp), text_type) + # 3) isinstance(timestamp, datetime) with tzinfo + # 4) isinstance(timestamp, datetime) without tzinfo + + timestamp = pd.Timestamp('2013-01-01 23:10:55.123456987+00:00') + self.assertEqual( + line_protocol._convert_timestamp(timestamp), + 1357081855123456987 + ) + self.assertEqual( + line_protocol._convert_timestamp(timestamp, time_precision='h'), + 1357081855 // 3600 + ) + self.assertEqual( + line_protocol._convert_timestamp(timestamp, time_precision='m'), + 1357081855 // 60 + ) + self.assertEqual( + line_protocol._convert_timestamp(timestamp, time_precision='s'), + 1357081855 + ) + self.assertEqual( + line_protocol._convert_timestamp(timestamp, time_precision='ms'), + 1357081855123 + ) + self.assertEqual( + line_protocol._convert_timestamp(timestamp, time_precision='u'), + 1357081855123456 + ) + self.assertEqual( + line_protocol._convert_timestamp(timestamp, time_precision='n'), + 1357081855123456987 + ) + def test_timezone(self): """Test timezone in TestLineProtocol object.""" + # datetime tests dt = datetime(2009, 11, 10, 23, 0, 0, 123456) utc = UTC.localize(dt) berlin = timezone('Europe/Berlin').localize(dt) eastern = berlin.astimezone(timezone('US/Eastern')) - data = { - "points": [ - {"measurement": "A", "fields": {"val": 1}, - "time": 0}, - {"measurement": "A", "fields": {"val": 1}, - "time": "2009-11-10T23:00:00.123456Z"}, - {"measurement": "A", "fields": {"val": 1}, "time": dt}, - {"measurement": "A", "fields": {"val": 1}, "time": utc}, - {"measurement": "A", "fields": {"val": 1}, "time": berlin}, - {"measurement": "A", "fields": {"val": 1}, "time": eastern}, - ] + # pandas ns timestamp tests + pddt = pd.Timestamp('2009-11-10 23:00:00.123456987') + pdutc = pd.Timestamp(pddt, tz='UTC') + pdberlin = pdutc.astimezone('Europe/Berlin') + pdeastern = pdberlin.astimezone('US/Eastern') + + data = {"points": [ + {"measurement": "A", "fields": {"val": 1}, "time": 0}, + # string representations + # String version for datetime + {"measurement": "A", "fields": {"val": 1}, + "time": "2009-11-10T23:00:00.123456Z"}, + # String version for pandas ns timestamp + {"measurement": "A", "fields": {"val": 1}, + "time": "2009-11-10 23:00:00.123456987"}, + # datetime + {"measurement": "A", "fields": {"val": 1}, "time": dt}, + {"measurement": "A", "fields": {"val": 1}, "time": utc}, + {"measurement": "A", "fields": {"val": 1}, "time": berlin}, + {"measurement": "A", "fields": {"val": 1}, "time": eastern}, + # pandas timestamp + {"measurement": "A", "fields": {"val": 1}, "time": pddt}, + {"measurement": "A", "fields": {"val": 1}, "time": pdutc}, + {"measurement": "A", "fields": {"val": 1}, "time": pdberlin}, + {"measurement": "A", "fields": {"val": 1}, "time": pdeastern}, + ] } + self.assertEqual( line_protocol.make_lines(data), '\n'.join([ 'A val=1i 0', 'A val=1i 1257894000123456000', + 'A val=1i 1257894000123456987', + # datetime results 'A val=1i 1257894000123456000', 'A val=1i 1257894000123456000', 'A val=1i 1257890400123456000', 'A val=1i 1257890400123456000', + # pandas ns timestamp results + 'A val=1i 1257894000123456987', + 'A val=1i 1257894000123456987', + 'A val=1i 1257894000123456987', + 'A val=1i 1257894000123456987', ]) + '\n' )