Skip to content

Commit

Permalink
fix: skip DataFrame rows without data (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored Nov 27, 2020
1 parent c8fd3b5 commit ffae906
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 1.13.0 [unreleased]

### Bug Fixes
1. [#164](https://github.com/influxdata/influxdb-client-python/pull/170): Skip DataFrame rows without data - all fields are nan.


## 1.12.0 [2020-10-30]

1. [#163](https://github.com/influxdata/influxdb-client-python/pull/163): Added support for Python 3.9
Expand Down
23 changes: 18 additions & 5 deletions influxdb_client/client/write/dataframe_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ def _itertuples(data_frame):
return zip(data_frame.index, *cols)


def _is_nan(x):
return x != x


def _any_not_nan(p, indexes):
return any(map(lambda inx: not _is_nan(p[inx]), indexes))


def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
"""Serialize DataFrame into LineProtocols."""
from ...extras import pd, np
Expand All @@ -61,6 +69,7 @@ def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):

tags = []
fields = []
fields_indexes = []
keys = []

if point_settings.defaultTags:
Expand All @@ -73,14 +82,18 @@ def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
keys.append(key.translate(_ESCAPE_KEY))
key_format = f'{{keys[{index}]}}'

index_value = index + 1
if key in data_frame_tag_columns:
tags.append({'key': key, 'value': f"{key_format}={{str(p[{index + 1}]).translate(_ESCAPE_KEY)}}"})
tags.append({'key': key, 'value': f"{key_format}={{str(p[{index_value}]).translate(_ESCAPE_KEY)}}"})
elif issubclass(value.type, np.integer):
fields.append(f"{key_format}={{p[{index + 1}]}}i")
fields.append(f"{key_format}={{p[{index_value}]}}i")
fields_indexes.append(index_value)
elif issubclass(value.type, (np.float, np.bool_)):
fields.append(f"{key_format}={{p[{index + 1}]}}")
fields.append(f"{key_format}={{p[{index_value}]}}")
fields_indexes.append(index_value)
else:
fields.append(f"{key_format}=\"{{str(p[{index + 1}]).translate(_ESCAPE_STRING)}}\"")
fields.append(f"{key_format}=\"{{str(p[{index_value}]).translate(_ESCAPE_STRING)}}\"")
fields_indexes.append(index_value)

tags.sort(key=lambda x: x['key'])
tags = ','.join(map(lambda y: y['value'], tags))
Expand All @@ -100,7 +113,7 @@ def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
if isnull.any():
rep = _replace(data_frame)
lp = (reduce(lambda a, b: re.sub(*b, a), rep, f(p))
for p in _itertuples(data_frame))
for p in filter(lambda x: _any_not_nan(x, fields_indexes), _itertuples(data_frame)))
return list(lp)
else:
return list(map(f, _itertuples(data_frame)))
5 changes: 3 additions & 2 deletions tests/test_WriteApiDataFrame.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ def test_write_nan(self):
data_frame = pd.DataFrame(data=[[3.1955, np.nan, 20.514305, np.nan],
[5.7310, np.nan, 23.328710, np.nan],
[np.nan, 3.138664, np.nan, 20.755026],
[5.7310, 5.139563, 23.328710, 19.791240]],
[5.7310, 5.139563, 23.328710, 19.791240],
[np.nan, np.nan, np.nan, np.nan]],
index=[now, now + timedelta(minutes=30), now + timedelta(minutes=60),
now + timedelta(minutes=90)],
now + timedelta(minutes=90), now + timedelta(minutes=120)],
columns=["actual_kw_price", "forecast_kw_price", "actual_general_use",
"forecast_general_use"])

Expand Down

0 comments on commit ffae906

Please sign in to comment.