Skip to content

Commit d2472f4

Browse files
authored
fix: fixed serialization of DataFrame with empty (NaN) values, fixed escaping whitespaces, fixed order of tags (#123)
1 parent f74d183 commit d2472f4

File tree

4 files changed

+237
-55
lines changed

4 files changed

+237
-55
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
### Bug Fixes
99
1. [#117](https://github.com/influxdata/influxdb-client-python/pull/117): Fixed appending default tags for single Point
1010
1. [#115](https://github.com/influxdata/influxdb-client-python/pull/115): Fixed serialization of `\n`, `\r` and `\t` to Line Protocol, `=` is valid sign for measurement name
11+
1. [#118](https://github.com/influxdata/influxdb-client-python/issues/118): Fixed serialization of DataFrame with empty (NaN) values
1112

1213
## 1.8.0 [2020-06-19]
1314

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import re
2+
from functools import reduce
3+
from itertools import chain
4+
5+
from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_MEASUREMENT
6+
7+
"""
8+
Functions for serialize Pandas DataFrame.
9+
Much of the code here is inspired by that in the aioinflux packet found here: https://github.com/gusutabopb/aioinflux
10+
"""
11+
12+
13+
def _replace(data_frame):
14+
from ...extras import np
15+
16+
# string columns
17+
obj_cols = {k for k, v in dict(data_frame.dtypes).items() if v is np.dtype('O')}
18+
19+
# number columns
20+
other_cols = set(data_frame.columns) - obj_cols
21+
22+
obj_nans = (f'{k}=nan' for k in obj_cols)
23+
other_nans = (f'{k}=nani?' for k in other_cols)
24+
25+
replacements = [
26+
('|'.join(chain(obj_nans, other_nans)), ''),
27+
(',{2,}', ','),
28+
('|'.join([', ,', ', ', ' ,']), ' '),
29+
]
30+
31+
return replacements
32+
33+
34+
def _itertuples(data_frame):
35+
cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))]
36+
return zip(data_frame.index, *cols)
37+
38+
39+
def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
40+
from ...extras import pd, np
41+
if not isinstance(data_frame, pd.DataFrame):
42+
raise TypeError('Must be DataFrame, but type was: {0}.'
43+
.format(type(data_frame)))
44+
45+
if 'data_frame_measurement_name' not in kwargs:
46+
raise TypeError('"data_frame_measurement_name" is a Required Argument')
47+
48+
if isinstance(data_frame.index, pd.PeriodIndex):
49+
data_frame.index = data_frame.index.to_timestamp()
50+
else:
51+
data_frame.index = pd.to_datetime(data_frame.index)
52+
53+
if data_frame.index.tzinfo is None:
54+
data_frame.index = data_frame.index.tz_localize('UTC')
55+
56+
measurement_name = str(kwargs.get('data_frame_measurement_name')).translate(_ESCAPE_MEASUREMENT)
57+
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
58+
data_frame_tag_columns = set(data_frame_tag_columns or [])
59+
60+
tags = []
61+
fields = []
62+
keys = []
63+
64+
if point_settings.defaultTags:
65+
for key, value in point_settings.defaultTags.items():
66+
data_frame[key] = value
67+
data_frame_tag_columns.add(key)
68+
69+
for index, (key, value) in enumerate(data_frame.dtypes.items()):
70+
key = str(key)
71+
keys.append(key.translate(_ESCAPE_KEY))
72+
key_format = f'{{keys[{index}]}}'
73+
74+
if key in data_frame_tag_columns:
75+
tags.append({'key': key, 'value': f"{key_format}={{str(p[{index + 1}]).translate(_ESCAPE_KEY)}}"})
76+
elif issubclass(value.type, np.integer):
77+
fields.append(f"{key_format}={{p[{index + 1}]}}i")
78+
elif issubclass(value.type, (np.float, np.bool_)):
79+
fields.append(f"{key_format}={{p[{index + 1}]}}")
80+
else:
81+
fields.append(f"{key_format}=\"{{str(p[{index + 1}]).translate(_ESCAPE_KEY)}}\"")
82+
83+
tags.sort(key=lambda x: x['key'])
84+
tags = ','.join(map(lambda y: y['value'], tags))
85+
86+
fmt = ('{measurement_name}', f'{"," if tags else ""}', tags,
87+
' ', ','.join(fields), ' {p[0].value}')
88+
f = eval("lambda p: f'{}'".format(''.join(fmt)),
89+
{'measurement_name': measurement_name, '_ESCAPE_KEY': _ESCAPE_KEY, 'keys': keys})
90+
91+
for k, v in dict(data_frame.dtypes).items():
92+
if k in data_frame_tag_columns:
93+
data_frame[k].replace('', np.nan, inplace=True)
94+
95+
isnull = data_frame.isnull().any(axis=1)
96+
97+
if isnull.any():
98+
rep = _replace(data_frame)
99+
lp = (reduce(lambda a, b: re.sub(*b, a), rep, f(p))
100+
for p in _itertuples(data_frame))
101+
return list(lp)
102+
else:
103+
return list(map(f, _itertuples(data_frame)))

influxdb_client/client/write_api.py

Lines changed: 4 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
from rx.subject import Subject
1515

1616
from influxdb_client import WritePrecision, WriteService
17-
from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION, _ESCAPE_KEY
17+
from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points
18+
from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION
1819
from influxdb_client.rest import ApiException
1920

2021
logger = logging.getLogger(__name__)
@@ -258,7 +259,7 @@ def _serialize(self, record, write_precision, payload, **kwargs):
258259
self._serialize(Point.from_dict(record, write_precision=write_precision),
259260
write_precision, payload, **kwargs)
260261
elif 'DataFrame' in type(record).__name__:
261-
_data = self._data_frame_to_list_of_points(record, precision=write_precision, **kwargs)
262+
_data = data_frame_to_list_of_points(record, self._point_settings, **kwargs)
262263
self._serialize(_data, write_precision, payload, **kwargs)
263264

264265
elif isinstance(record, list):
@@ -284,7 +285,7 @@ def _write_batching(self, bucket, org, data,
284285
precision, **kwargs)
285286

286287
elif 'DataFrame' in type(data).__name__:
287-
self._write_batching(bucket, org, self._data_frame_to_list_of_points(data, precision, **kwargs),
288+
self._write_batching(bucket, org, data_frame_to_list_of_points(data, self._point_settings, **kwargs),
288289
precision, **kwargs)
289290

290291
elif isinstance(data, list):
@@ -306,57 +307,6 @@ def _append_default_tag(self, key, val, record):
306307
for item in record:
307308
self._append_default_tag(key, val, item)
308309

309-
def _itertuples(self, data_frame):
310-
cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))]
311-
return zip(data_frame.index, *cols)
312-
313-
def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs):
314-
from ..extras import pd, np
315-
if not isinstance(data_frame, pd.DataFrame):
316-
raise TypeError('Must be DataFrame, but type was: {0}.'
317-
.format(type(data_frame)))
318-
319-
if 'data_frame_measurement_name' not in kwargs:
320-
raise TypeError('"data_frame_measurement_name" is a Required Argument')
321-
322-
if isinstance(data_frame.index, pd.PeriodIndex):
323-
data_frame.index = data_frame.index.to_timestamp()
324-
else:
325-
data_frame.index = pd.to_datetime(data_frame.index)
326-
327-
if data_frame.index.tzinfo is None:
328-
data_frame.index = data_frame.index.tz_localize('UTC')
329-
330-
measurement_name = kwargs.get('data_frame_measurement_name')
331-
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
332-
data_frame_tag_columns = set(data_frame_tag_columns or [])
333-
334-
tags = []
335-
fields = []
336-
337-
if self._point_settings.defaultTags:
338-
for key, value in self._point_settings.defaultTags.items():
339-
data_frame[key] = value
340-
data_frame_tag_columns.add(key)
341-
342-
for index, (key, value) in enumerate(data_frame.dtypes.items()):
343-
key = str(key).translate(_ESCAPE_KEY)
344-
345-
if key in data_frame_tag_columns:
346-
tags.append(f"{key}={{p[{index + 1}].translate(_ESCAPE_KEY)}}")
347-
elif issubclass(value.type, np.integer):
348-
fields.append(f"{key}={{p[{index + 1}]}}i")
349-
elif issubclass(value.type, (np.float, np.bool_)):
350-
fields.append(f"{key}={{p[{index + 1}]}}")
351-
else:
352-
fields.append(f"{key}=\"{{p[{index + 1}].translate(_ESCAPE_KEY)}}\"")
353-
354-
fmt = (f'{measurement_name}', f'{"," if tags else ""}', ','.join(tags),
355-
' ', ','.join(fields), ' {p[0].value}')
356-
f = eval("lambda p: f'{}'".format(''.join(fmt)))
357-
358-
return list(map(f, self._itertuples(data_frame)))
359-
360310
def _http(self, batch_item: _BatchItem):
361311

362312
logger.debug("Write time series data into InfluxDB: %s", batch_item)

tests/test_WriteApiDataFrame.py

Lines changed: 129 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
from datetime import timedelta
66

77
from influxdb_client import InfluxDBClient, WriteOptions, WriteApi
8-
from influxdb_client.client.write_api import SYNCHRONOUS
8+
from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points
9+
from influxdb_client.client.write_api import SYNCHRONOUS, PointSettings
910
from tests.base_test import BaseTest
1011

1112

@@ -86,3 +87,130 @@ def test_write_num_py(self):
8687
self.assertEqual(result[0].records[1].get_value(), 200.0)
8788

8889
pass
90+
91+
def test_write_nan(self):
92+
from influxdb_client.extras import pd, np
93+
94+
now = pd.Timestamp('2020-04-05 00:00+00:00')
95+
96+
data_frame = pd.DataFrame(data=[[3.1955, np.nan, 20.514305, np.nan],
97+
[5.7310, np.nan, 23.328710, np.nan],
98+
[np.nan, 3.138664, np.nan, 20.755026],
99+
[5.7310, 5.139563, 23.328710, 19.791240]],
100+
index=[now, now + timedelta(minutes=30), now + timedelta(minutes=60),
101+
now + timedelta(minutes=90)],
102+
columns=["actual_kw_price", "forecast_kw_price", "actual_general_use",
103+
"forecast_general_use"])
104+
105+
points = data_frame_to_list_of_points(data_frame=data_frame, point_settings=PointSettings(),
106+
data_frame_measurement_name='measurement')
107+
108+
self.assertEqual(4, len(points))
109+
self.assertEqual("measurement actual_kw_price=3.1955,actual_general_use=20.514305 1586044800000000000",
110+
points[0])
111+
self.assertEqual("measurement actual_kw_price=5.731,actual_general_use=23.32871 1586046600000000000",
112+
points[1])
113+
self.assertEqual("measurement forecast_kw_price=3.138664,forecast_general_use=20.755026 1586048400000000000",
114+
points[2])
115+
self.assertEqual("measurement actual_kw_price=5.731,forecast_kw_price=5.139563,actual_general_use=23.32871,"
116+
"forecast_general_use=19.79124 1586050200000000000",
117+
points[3])
118+
119+
def test_write_tag_nan(self):
120+
from influxdb_client.extras import pd, np
121+
122+
now = pd.Timestamp('2020-04-05 00:00+00:00')
123+
124+
data_frame = pd.DataFrame(data=[["", 3.1955, 20.514305],
125+
['', 5.7310, 23.328710],
126+
[np.nan, 5.7310, 23.328710],
127+
["tag", 3.138664, 20.755026]],
128+
index=[now, now + timedelta(minutes=30),
129+
now + timedelta(minutes=60), now + timedelta(minutes=90)],
130+
columns=["tag", "actual_kw_price", "forecast_kw_price"])
131+
132+
write_api = self.client.write_api(write_options=SYNCHRONOUS, point_settings=PointSettings())
133+
134+
points = data_frame_to_list_of_points(data_frame=data_frame,
135+
point_settings=PointSettings(),
136+
data_frame_measurement_name='measurement',
137+
data_frame_tag_columns={"tag"})
138+
139+
self.assertEqual(4, len(points))
140+
self.assertEqual("measurement actual_kw_price=3.1955,forecast_kw_price=20.514305 1586044800000000000",
141+
points[0])
142+
self.assertEqual("measurement actual_kw_price=5.731,forecast_kw_price=23.32871 1586046600000000000",
143+
points[1])
144+
self.assertEqual("measurement actual_kw_price=5.731,forecast_kw_price=23.32871 1586048400000000000",
145+
points[2])
146+
self.assertEqual("measurement,tag=tag actual_kw_price=3.138664,forecast_kw_price=20.755026 1586050200000000000",
147+
points[3])
148+
149+
write_api.__del__()
150+
151+
def test_escaping_measurement(self):
152+
from influxdb_client.extras import pd, np
153+
154+
now = pd.Timestamp('2020-04-05 00:00+00:00')
155+
156+
data_frame = pd.DataFrame(data=[["coyote_creek", np.int64(100.5)], ["coyote_creek", np.int64(200)]],
157+
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
158+
columns=["location", "water_level"])
159+
160+
points = data_frame_to_list_of_points(data_frame=data_frame,
161+
point_settings=PointSettings(),
162+
data_frame_measurement_name='measu rement',
163+
data_frame_tag_columns={"tag"})
164+
165+
self.assertEqual(2, len(points))
166+
self.assertEqual("measu\\ rement location=\"coyote_creek\",water_level=100i 1586048400000000000",
167+
points[0])
168+
self.assertEqual("measu\\ rement location=\"coyote_creek\",water_level=200i 1586052000000000000",
169+
points[1])
170+
171+
points = data_frame_to_list_of_points(data_frame=data_frame,
172+
point_settings=PointSettings(),
173+
data_frame_measurement_name='measu\nrement2',
174+
data_frame_tag_columns={"tag"})
175+
176+
self.assertEqual(2, len(points))
177+
self.assertEqual("measu\\nrement2 location=\"coyote_creek\",water_level=100i 1586048400000000000",
178+
points[0])
179+
self.assertEqual("measu\\nrement2 location=\"coyote_creek\",water_level=200i 1586052000000000000",
180+
points[1])
181+
182+
def test_tag_escaping_key_and_value(self):
183+
from influxdb_client.extras import pd, np
184+
185+
now = pd.Timestamp('2020-04-05 00:00+00:00')
186+
187+
data_frame = pd.DataFrame(data=[["carriage\nreturn", "new\nline", "t\tab", np.int64(2)], ],
188+
index=[now + timedelta(hours=1), ],
189+
columns=["carriage\rreturn", "new\nline", "t\tab", "l\ne\rv\tel"])
190+
191+
points = data_frame_to_list_of_points(data_frame=data_frame,
192+
point_settings=PointSettings(),
193+
data_frame_measurement_name='h\n2\ro\t_data',
194+
data_frame_tag_columns={"new\nline", "carriage\rreturn", "t\tab"})
195+
196+
self.assertEqual(1, len(points))
197+
self.assertEqual(
198+
"h\\n2\\ro\\t_data,carriage\\rreturn=carriage\\nreturn,new\\nline=new\\nline,t\\tab=t\\tab l\\ne\\rv\\tel=2i 1586048400000000000",
199+
points[0])
200+
201+
def test_tags_order(self):
202+
from influxdb_client.extras import pd, np
203+
204+
now = pd.Timestamp('2020-04-05 00:00+00:00')
205+
206+
data_frame = pd.DataFrame(data=[["c", "a", "b", np.int64(2)], ],
207+
index=[now + timedelta(hours=1), ],
208+
columns=["c", "a", "b", "level"])
209+
210+
points = data_frame_to_list_of_points(data_frame=data_frame,
211+
point_settings=PointSettings(),
212+
data_frame_measurement_name='h2o',
213+
data_frame_tag_columns={"c", "a", "b"})
214+
215+
self.assertEqual(1, len(points))
216+
self.assertEqual("h2o,a=a,b=b,c=c level=2i 1586048400000000000", points[0])

0 commit comments

Comments
 (0)