Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: supports write_precision for write Pandas DataFrame #270

Merged
merged 2 commits into from
Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
### Deprecated
1. [#264](https://github.com/influxdata/influxdb-client-python/pull/264): Deprecated `org_id` options BucketsApi.create_bucket in favor of `org` parameter

### Bug Fixes
1. [#270](https://github.com/influxdata/influxdb-client-python/pull/270): Supports `write_precision` for write Pandas DataFrame

## 1.18.0 [2021-06-04]

### Breaking Changes
Expand Down
21 changes: 18 additions & 3 deletions influxdb_client/client/write/dataframe_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import re
import math

from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT
from influxdb_client import WritePrecision
from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT, DEFAULT_WRITE_PRECISION


def _itertuples(data_frame):
Expand All @@ -23,8 +24,16 @@ def _any_not_nan(p, indexes):
return any(map(lambda x: _not_nan(p[x]), indexes))


def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
"""Serialize DataFrame into LineProtocols."""
def data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION, **kwargs):
"""
Serialize DataFrame into LineProtocols.

:param data_frame: Pandas DataFrame to serialize
:param point_settings: Default Tags
:param precision: The precision for the unix timestamps within the body line-protocol.
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
"""
# This function is hard to understand but for good reason:
# the approach used here is considerably more efficient
# than the alternatives.
Expand Down Expand Up @@ -179,6 +188,12 @@ def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
tags = ''.join(tags)
fields = ''.join(fields)
timestamp = '{p[0].value}'
if precision == WritePrecision.US:
timestamp = '{int(p[0].value / 1e3)}'
elif precision == WritePrecision.MS:
timestamp = '{int(p[0].value / 1e6)}'
elif precision == WritePrecision.S:
timestamp = '{int(p[0].value / 1e9)}'

f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', {
'measurement_name': measurement_name,
Expand Down
5 changes: 3 additions & 2 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def _serialize(self, record, write_precision, payload, **kwargs):
self._serialize(Point.from_dict(record, write_precision=write_precision),
write_precision, payload, **kwargs)
elif 'DataFrame' in type(record).__name__:
_data = data_frame_to_list_of_points(record, self._point_settings, **kwargs)
_data = data_frame_to_list_of_points(record, self._point_settings, write_precision, **kwargs)
self._serialize(_data, write_precision, payload, **kwargs)

elif isinstance(record, Iterable):
Expand All @@ -338,7 +338,8 @@ def _write_batching(self, bucket, org, data,
precision, **kwargs)

elif 'DataFrame' in type(data).__name__:
self._write_batching(bucket, org, data_frame_to_list_of_points(data, self._point_settings, **kwargs),
self._write_batching(bucket, org,
data_frame_to_list_of_points(data, self._point_settings, precision, **kwargs),
precision, **kwargs)

elif isinstance(data, Iterable):
Expand Down
33 changes: 26 additions & 7 deletions tests/test_WriteApiDataFrame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import unittest
from datetime import timedelta

from influxdb_client import InfluxDBClient, WriteOptions, WriteApi
from influxdb_client import InfluxDBClient, WriteOptions, WriteApi, WritePrecision
from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points
from influxdb_client.client.write_api import SYNCHRONOUS, PointSettings
from tests.base_test import BaseTest
Expand Down Expand Up @@ -56,8 +56,8 @@ class DataSerializerTest(unittest.TestCase):
def test_convert_data_frame(self):
from influxdb_client.extras import pd, np

num_rows=1500000
col_data={
num_rows = 1500000
col_data = {
'time': np.arange(0, num_rows, 1, dtype=int),
'col1': np.random.choice(['test_a', 'test_b', 'test_c'], size=(num_rows,)),
}
Expand All @@ -69,8 +69,8 @@ def test_convert_data_frame(self):

start = time.time()
data_frame_to_list_of_points(data_frame, PointSettings(),
data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])
data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

print("Time elapsed: ", (time.time() - start))

Expand Down Expand Up @@ -291,7 +291,7 @@ def test_with_default_tags(self):
self.assertEqual("h2o,t1=a2,t2=every,t3=c2 value=2i 1586052000000000000", points[1])

# Check that the data frame hasn't been changed (an earlier version did change it)
self.assertEqual(True, (data_frame == original_data).all(axis = None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}')
self.assertEqual(True, (data_frame == original_data).all(axis=None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}')

# Check that the default tags won't override actual column data.
# This is arguably incorrect behavior, but it's how it works currently.
Expand All @@ -304,7 +304,7 @@ def test_with_default_tags(self):
self.assertEqual("h2o,t1=a1,t3=c1 value=1i 1586048400000000000", points[0])
self.assertEqual("h2o,t1=a2,t3=c2 value=2i 1586052000000000000", points[1])

self.assertEqual(True, (data_frame == original_data).all(axis = None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}')
self.assertEqual(True, (data_frame == original_data).all(axis=None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}')

def test_with_period_index(self):
from influxdb_client.extras import pd
Expand Down Expand Up @@ -333,3 +333,22 @@ def test_write_num_py_floats(self):
self.assertEqual(1, len(points))
self.assertEqual("h2o level=15.5 1586044800000000000", points[0], msg=f'Current type: {np_float_type}')

def test_write_precision(self):
from influxdb_client.extras import pd
now = pd.Timestamp('2020-04-05 00:00+00:00')
precisions = [
(WritePrecision.NS, 1586044800000000000),
(WritePrecision.US, 1586044800000000),
(WritePrecision.MS, 1586044800000),
(WritePrecision.S, 1586044800),
(None, 1586044800000000000)
]

for precision in precisions:
data_frame = pd.DataFrame([15], index=[now], columns=['level'])
points = data_frame_to_list_of_points(data_frame=data_frame,
data_frame_measurement_name='h2o',
point_settings=PointSettings(),
precision=precision[0])
self.assertEqual(1, len(points))
self.assertEqual(f"h2o level=15i {precision[1]}", points[0])