Skip to content

Commit

Permalink
feat: support for writing pandas DataFrame (#79) - batching
Browse files Browse the repository at this point in the history
  • Loading branch information
rolincova committed Apr 29, 2020
1 parent baf6951 commit 0cacefc
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
7 changes: 2 additions & 5 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ def write(self, bucket: str, org: str = None,
r.tag(key, val)

if self._write_options.write_type is WriteType.batching:
return self._write_batching(bucket, org, record, write_precision)
return self._write_batching(bucket, org, record, data_frame_measurement_name, data_frame_tag_columns,
write_precision)

final_string = self._serialize(record, write_precision,
data_frame_measurement_name,
Expand Down Expand Up @@ -311,10 +312,6 @@ def _data_frame_to_list_of_points(self, data_frame, data_frame_measurement_name,
if not isinstance(data_frame, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(data_frame)))
if not (isinstance(data_frame.index, pd.PeriodIndex) or
isinstance(data_frame.index, pd.DatetimeIndex)):
raise TypeError('Must be DataFrame with DatetimeIndex or \
PeriodIndex.')

if isinstance(data_frame.index, pd.PeriodIndex):
data_frame.index = data_frame.index.to_timestamp()
Expand Down
27 changes: 27 additions & 0 deletions tests/test_WriteApiBatching.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,33 @@ def test_to_low_flush_interval(self):

httpretty.reset()

def test_batching_data_frame(self):
from influxdb_client.extras import pd

httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)

data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0],
["coyote_creek", 3.0], ["coyote_creek", 4.0]],
index=[1, 2, 3, 4],
columns=["location", "level water_level"])

self._write_client.write("my-bucket", "my-org", record=data_frame,
data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

time.sleep(1)

_requests = httpretty.httpretty.latest_requests

self.assertEqual(2, len(_requests))
_request1 = "h2o_feet,location=coyote_creek level\\ water_level=1.0 1\n" \
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2"
_request2 = "h2o_feet,location=coyote_creek level\\ water_level=3.0 3\n" \
"h2o_feet,location=coyote_creek level\\ water_level=4.0 4"

self.assertEqual(_request1, _requests[0].parsed_body)
self.assertEqual(_request2, _requests[1].parsed_body)

if __name__ == '__main__':
unittest.main()

0 comments on commit 0cacefc

Please sign in to comment.