Skip to content

Commit

Permalink
fix: supports write_precision for write Pandas DataFrame (#270)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored Jun 16, 2021
1 parent a2dbc0b commit e0e46cc
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 12 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
### Deprecated
1. [#264](https://github.com/influxdata/influxdb-client-python/pull/264): Deprecated `org_id` options BucketsApi.create_bucket in favor of `org` parameter

### Bug Fixes
1. [#270](https://github.com/influxdata/influxdb-client-python/pull/270): Supports `write_precision` for write Pandas DataFrame

## 1.18.0 [2021-06-04]

### Breaking Changes
Expand Down
21 changes: 18 additions & 3 deletions influxdb_client/client/write/dataframe_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import re
import math

from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT
from influxdb_client import WritePrecision
from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT, DEFAULT_WRITE_PRECISION


def _itertuples(data_frame):
Expand All @@ -23,8 +24,16 @@ def _any_not_nan(p, indexes):
return any(map(lambda x: _not_nan(p[x]), indexes))


def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
"""Serialize DataFrame into LineProtocols."""
def data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION, **kwargs):
"""
Serialize DataFrame into LineProtocols.
:param data_frame: Pandas DataFrame to serialize
:param point_settings: Default Tags
:param precision: The precision for the unix timestamps within the body line-protocol.
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
"""
# This function is hard to understand but for good reason:
# the approach used here is considerably more efficient
# than the alternatives.
Expand Down Expand Up @@ -179,6 +188,12 @@ def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
tags = ''.join(tags)
fields = ''.join(fields)
timestamp = '{p[0].value}'
if precision == WritePrecision.US:
timestamp = '{int(p[0].value / 1e3)}'
elif precision == WritePrecision.MS:
timestamp = '{int(p[0].value / 1e6)}'
elif precision == WritePrecision.S:
timestamp = '{int(p[0].value / 1e9)}'

f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', {
'measurement_name': measurement_name,
Expand Down
5 changes: 3 additions & 2 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def _serialize(self, record, write_precision, payload, **kwargs):
self._serialize(Point.from_dict(record, write_precision=write_precision),
write_precision, payload, **kwargs)
elif 'DataFrame' in type(record).__name__:
_data = data_frame_to_list_of_points(record, self._point_settings, **kwargs)
_data = data_frame_to_list_of_points(record, self._point_settings, write_precision, **kwargs)
self._serialize(_data, write_precision, payload, **kwargs)

elif isinstance(record, Iterable):
Expand All @@ -338,7 +338,8 @@ def _write_batching(self, bucket, org, data,
precision, **kwargs)

elif 'DataFrame' in type(data).__name__:
self._write_batching(bucket, org, data_frame_to_list_of_points(data, self._point_settings, **kwargs),
self._write_batching(bucket, org,
data_frame_to_list_of_points(data, self._point_settings, precision, **kwargs),
precision, **kwargs)

elif isinstance(data, Iterable):
Expand Down
33 changes: 26 additions & 7 deletions tests/test_WriteApiDataFrame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import unittest
from datetime import timedelta

from influxdb_client import InfluxDBClient, WriteOptions, WriteApi
from influxdb_client import InfluxDBClient, WriteOptions, WriteApi, WritePrecision
from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points
from influxdb_client.client.write_api import SYNCHRONOUS, PointSettings
from tests.base_test import BaseTest
Expand Down Expand Up @@ -56,8 +56,8 @@ class DataSerializerTest(unittest.TestCase):
def test_convert_data_frame(self):
from influxdb_client.extras import pd, np

num_rows=1500000
col_data={
num_rows = 1500000
col_data = {
'time': np.arange(0, num_rows, 1, dtype=int),
'col1': np.random.choice(['test_a', 'test_b', 'test_c'], size=(num_rows,)),
}
Expand All @@ -69,8 +69,8 @@ def test_convert_data_frame(self):

start = time.time()
data_frame_to_list_of_points(data_frame, PointSettings(),
data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])
data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

print("Time elapsed: ", (time.time() - start))

Expand Down Expand Up @@ -291,7 +291,7 @@ def test_with_default_tags(self):
self.assertEqual("h2o,t1=a2,t2=every,t3=c2 value=2i 1586052000000000000", points[1])

# Check that the data frame hasn't been changed (an earlier version did change it)
self.assertEqual(True, (data_frame == original_data).all(axis = None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}')
self.assertEqual(True, (data_frame == original_data).all(axis=None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}')

# Check that the default tags won't override actual column data.
# This is arguably incorrect behavior, but it's how it works currently.
Expand All @@ -304,7 +304,7 @@ def test_with_default_tags(self):
self.assertEqual("h2o,t1=a1,t3=c1 value=1i 1586048400000000000", points[0])
self.assertEqual("h2o,t1=a2,t3=c2 value=2i 1586052000000000000", points[1])

self.assertEqual(True, (data_frame == original_data).all(axis = None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}')
self.assertEqual(True, (data_frame == original_data).all(axis=None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}')

def test_with_period_index(self):
from influxdb_client.extras import pd
Expand Down Expand Up @@ -333,3 +333,22 @@ def test_write_num_py_floats(self):
self.assertEqual(1, len(points))
self.assertEqual("h2o level=15.5 1586044800000000000", points[0], msg=f'Current type: {np_float_type}')

def test_write_precision(self):
from influxdb_client.extras import pd
now = pd.Timestamp('2020-04-05 00:00+00:00')
precisions = [
(WritePrecision.NS, 1586044800000000000),
(WritePrecision.US, 1586044800000000),
(WritePrecision.MS, 1586044800000),
(WritePrecision.S, 1586044800),
(None, 1586044800000000000)
]

for precision in precisions:
data_frame = pd.DataFrame([15], index=[now], columns=['level'])
points = data_frame_to_list_of_points(data_frame=data_frame,
data_frame_measurement_name='h2o',
point_settings=PointSettings(),
precision=precision[0])
self.assertEqual(1, len(points))
self.assertEqual(f"h2o level=15i {precision[1]}", points[0])

0 comments on commit e0e46cc

Please sign in to comment.