Skip to content

Commit

Permalink
fix: correct serialization empty columns into LineProtocol [DataFrame] (
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored Nov 11, 2021
1 parent 81e7d21 commit 500835f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
- `OrganizationsApi` - add possibility to: `update`
- `UsersApi` - add possibility to: `update`, `delete`, `find`

### Bug Fixes
1. [#359](https://github.com/influxdata/influxdb-client-python/pull/359): Correct serialization empty columns into LineProtocol [DataFrame]

## 1.23.0 [2021-10-26]

### Deprecates
Expand Down
11 changes: 8 additions & 3 deletions influxdb_client/client/write/dataframe_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def _any_not_nan(p, indexes):
return any(map(lambda x: _not_nan(p[x]), indexes))


_EMPTY_EXPRESSION = "_EMPTY_LINE_PROTOCOL_PART_"


class DataframeSerializer:
"""Serialize DataFrame into LineProtocols."""

Expand Down Expand Up @@ -177,13 +180,15 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
field_value = f'{sep}{key_format}={{{val_format}}}'
elif issubclass(value.type, np.floating):
if null_columns[index]:
field_value = f"""{{"" if math.isnan({val_format}) else f"{sep}{key_format}={{{val_format}}}"}}"""
field_value = f"""{{
"{sep}{_EMPTY_EXPRESSION}" if math.isnan({val_format}) else f"{sep}{key_format}={{{val_format}}}"
}}"""
else:
field_value = f'{sep}{key_format}={{{val_format}}}'
else:
if null_columns[index]:
field_value = f"""{{
'' if type({val_format}) == float and math.isnan({val_format}) else
'{sep}{_EMPTY_EXPRESSION}' if type({val_format}) == float and math.isnan({val_format}) else
f'{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"'
}}"""
else:
Expand Down Expand Up @@ -244,7 +249,7 @@ def serialize(self, chunk_idx: int = None):
if self.first_field_maybe_null:
# When the first field is null (None/NaN), we'll have
# a spurious leading comma which needs to be removed.
lp = (re.sub('^((\\ |[^ ])* ),', '\\1', self.f(p))
lp = (re.sub(f",{_EMPTY_EXPRESSION}|{_EMPTY_EXPRESSION},|{_EMPTY_EXPRESSION}", '', self.f(p))
for p in filter(lambda x: _any_not_nan(x, self.field_indexes), _itertuples(chunk)))
return list(lp)
else:
Expand Down
22 changes: 22 additions & 0 deletions tests/test_WriteApiDataFrame.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
import unittest
from datetime import timedelta
from io import StringIO

from influxdb_client import InfluxDBClient, WriteOptions, WriteApi, WritePrecision
from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points, DataframeSerializer
Expand Down Expand Up @@ -374,6 +375,27 @@ def test_index_not_periodIndex_respect_write_precision(self):
self.assertEqual(1, len(points))
self.assertEqual(f"h2o level=15i {precision[1]}", points[0])

def test_serialize_strings_with_commas(self):
from influxdb_client.extras import pd

csv = StringIO("""sep=;
Date;Entry Type;Value;Currencs;Category;Person;Account;Counter Account;Group;Note;Recurring;
"01.10.2018";"Expense";"-1,00";"EUR";"Testcategory";"";"Testaccount";"";"";"This, works";"no";
"02.10.2018";"Expense";"-1,00";"EUR";"Testcategory";"";"Testaccount";"";"";"This , works not";"no";
""")
data_frame = pd.read_csv(csv, sep=";", skiprows=1, decimal=",", encoding="utf-8")
data_frame['Date'] = pd.to_datetime(data_frame['Date'], format="%d.%m.%Y")
data_frame.set_index('Date', inplace=True)

points = data_frame_to_list_of_points(data_frame=data_frame,
data_frame_measurement_name="bookings",
data_frame_tag_columns=['Entry Type', 'Category', 'Person', 'Account'],
point_settings=PointSettings())

self.assertEqual(2, len(points))
self.assertEqual("bookings,Account=Testaccount,Category=Testcategory,Entry\\ Type=Expense Currencs=\"EUR\",Note=\"This, works\",Recurring=\"no\",Value=-1.0 1538352000000000000", points[0])
self.assertEqual("bookings,Account=Testaccount,Category=Testcategory,Entry\\ Type=Expense Currencs=\"EUR\",Note=\"This , works not\",Recurring=\"no\",Value=-1.0 1538438400000000000", points[1])


class DataSerializerChunksTest(unittest.TestCase):
def test_chunks(self):
Expand Down

0 comments on commit 500835f

Please sign in to comment.