Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support for writing pandas DataFrame #88

Merged
merged 7 commits into from
May 4, 2020
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.7.0 [unreleased]

### Features
1. [#79](https://github.com/influxdata/influxdb-client-python/issues/79): Added support for writing Pandas DataFrame

### Bug Fixes
1. [#85](https://github.com/influxdata/influxdb-client-python/issues/85): Fixed a possibility to generate empty write batch

Expand Down
12 changes: 12 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ InfluxDB 2.0 client features
- `Line Protocol <https://docs.influxdata.com/influxdb/v1.6/write_protocols/line_protocol_tutorial>`_
- `Data Point <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L16>`__
- `RxPY <https://rxpy.readthedocs.io/en/latest/>`__ Observable
- `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
- `How to writes <#writes>`_
- `InfluxDB 2.0 API <https://github.com/influxdata/influxdb/blob/master/http/swagger.yml>`_ client for management
- the client is generated from the `swagger <https://github.com/influxdata/influxdb/blob/master/http/swagger.yml>`_ by using the `openapi-generator <https://github.com/OpenAPITools/openapi-generator>`_
Expand Down Expand Up @@ -219,6 +220,7 @@ The data could be written as
3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time``
4. List of above items
5. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item
6. `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_


Batching
Expand Down Expand Up @@ -302,6 +304,16 @@ The batching is configurable by ``write_options``\ :

_write_client.write("my-bucket", "my-org", _data)

"""
Write Pandas DataFrame
"""
_now = pd.Timestamp().now('UTC')
_data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[now, now + timedelta(hours=1)],
columns=["location", "water_level"])

_write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

"""
Close client
Expand Down
89 changes: 74 additions & 15 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,21 +183,23 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()
def write(self, bucket: str, org: str = None,
record: Union[
str, List['str'], Point, List['Point'], dict, List['dict'], bytes, List['bytes'], Observable] = None,
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION) -> None:
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> None:
"""
Writes time-series data into influxdb.

:param str org: specifies the destination organization for writes; take either the ID or Name interchangeably; if both orgID and org are specified, org takes precedence. (required)
:param str bucket: specifies the destination bucket for writes (required)
:param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol
:param record: Points, line protocol, RxPY Observable to write
:param record: Points, line protocol, Pandas DataFrame, RxPY Observable to write
:param data_frame_measurement_name: name of measurement for writing Pandas DataFrame
:param data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields

"""

if org is None:
org = self._influxdb_client.org

if self._point_settings.defaultTags and record:
if self._point_settings.defaultTags and record is not None:
for key, val in self._point_settings.defaultTags.items():
if isinstance(record, dict):
record.get("tags")[key] = val
Expand All @@ -209,9 +211,10 @@ def write(self, bucket: str, org: str = None,
r.tag(key, val)

if self._write_options.write_type is WriteType.batching:
return self._write_batching(bucket, org, record, write_precision)
return self._write_batching(bucket, org, record,
write_precision, **kwargs)

final_string = self._serialize(record, write_precision)
final_string = self._serialize(record, write_precision, **kwargs)

_async_req = True if self._write_options.write_type == WriteType.asynchronous else False

Expand All @@ -235,7 +238,7 @@ def __del__(self):
self._disposable = None
pass

def _serialize(self, record, write_precision) -> bytes:
def _serialize(self, record, write_precision, **kwargs) -> bytes:
_result = b''
if isinstance(record, bytes):
_result = record
Expand All @@ -244,40 +247,96 @@ def _serialize(self, record, write_precision) -> bytes:
_result = record.encode("utf-8")

elif isinstance(record, Point):
_result = self._serialize(record.to_line_protocol(), write_precision=write_precision)
_result = self._serialize(record.to_line_protocol(), write_precision, **kwargs)

elif isinstance(record, dict):
_result = self._serialize(Point.from_dict(record, write_precision=write_precision),
write_precision=write_precision)
write_precision, **kwargs)
elif 'DataFrame' in type(record).__name__:
_result = self._serialize(self._data_frame_to_list_of_points(record,
precision=write_precision, **kwargs),
write_precision,
**kwargs)

elif isinstance(record, list):
_result = b'\n'.join([self._serialize(item, write_precision=write_precision) for item in record])
_result = b'\n'.join([self._serialize(item, write_precision,
**kwargs) for item in record])

return _result

def _write_batching(self, bucket, org, data, precision=DEFAULT_WRITE_PRECISION):
def _write_batching(self, bucket, org, data,
precision=DEFAULT_WRITE_PRECISION,
**kwargs):
_key = _BatchItemKey(bucket, org, precision)
if isinstance(data, bytes):
self._subject.on_next(_BatchItem(key=_key, data=data))

elif isinstance(data, str):
self._write_batching(bucket, org, data.encode("utf-8"), precision)
self._write_batching(bucket, org, data.encode("utf-8"),
precision, **kwargs)

elif isinstance(data, Point):
self._write_batching(bucket, org, data.to_line_protocol(), precision)
self._write_batching(bucket, org, data.to_line_protocol(),
precision, **kwargs)

elif isinstance(data, dict):
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision), precision)
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision),
precision, **kwargs)

elif 'DataFrame' in type(data).__name__:
self._write_batching(bucket, org, self._data_frame_to_list_of_points(data, precision, **kwargs),
precision, **kwargs)

elif isinstance(data, list):
for item in data:
self._write_batching(bucket, org, item, precision)
self._write_batching(bucket, org, item, precision, **kwargs)

elif isinstance(data, Observable):
data.subscribe(lambda it: self._write_batching(bucket, org, it, precision))
data.subscribe(lambda it: self._write_batching(bucket, org, it, precision, **kwargs))
pass

return None

def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs):
from ..extras import pd
if not isinstance(data_frame, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(data_frame)))

if 'data_frame_measurement_name' not in kwargs:
raise TypeError('"data_frame_measurement_name" is a Required Argument')

if isinstance(data_frame.index, pd.PeriodIndex):
data_frame.index = data_frame.index.to_timestamp()
else:
data_frame.index = pd.to_datetime(data_frame.index)

if data_frame.index.tzinfo is None:
data_frame.index = data_frame.index.tz_localize('UTC')

data = []

for c, (row) in enumerate(data_frame.values):
point = Point(measurement_name=kwargs.get('data_frame_measurement_name'))

for count, (value) in enumerate(row):
column = data_frame.columns[count]
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
if data_frame_tag_columns and column in data_frame_tag_columns:
point.tag(column, value)
else:
point.field(column, value)

point.time(data_frame.index[c], precision)

if self._point_settings.defaultTags:
for key, val in self._point_settings.defaultTags.items():
point.tag(key, val)

data.append(point)

return data

def _http(self, batch_item: _BatchItem):

logger.debug("Write time series data into InfluxDB: %s", batch_item)
Expand Down
90 changes: 90 additions & 0 deletions tests/test_WriteApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import unittest
import time
from datetime import timedelta
from multiprocessing.pool import ApplyResult

from influxdb_client import Point, WritePrecision, InfluxDBClient
Expand Down Expand Up @@ -224,6 +225,57 @@ def test_write_bytes(self):

self.delete_test_bucket(_bucket)

def test_write_data_frame(self):
from influxdb_client.extras import pd

bucket = self.create_test_bucket()

now = pd.Timestamp('1970-01-01 00:00+00:00')
data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
columns=["location", "water_level"])

self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

result = self.query_api.query(
"from(bucket:\"" + bucket.name + "\") |> range(start: 1970-01-01T00:00:00.000000001Z)", self.org)

self.assertEqual(1, len(result))
self.assertEqual(2, len(result[0].records))

self.assertEqual(result[0].records[0].get_measurement(), "h2o_feet")
self.assertEqual(result[0].records[0].get_value(), 1.0)
self.assertEqual(result[0].records[0].values.get("location"), "coyote_creek")
self.assertEqual(result[0].records[0].get_field(), "water_level")
self.assertEqual(result[0].records[0].get_time(),
datetime.datetime(1970, 1, 1, 1, 0, tzinfo=datetime.timezone.utc))

self.assertEqual(result[0].records[1].get_measurement(), "h2o_feet")
self.assertEqual(result[0].records[1].get_value(), 2.0)
self.assertEqual(result[0].records[1].values.get("location"), "coyote_creek")
self.assertEqual(result[0].records[1].get_field(), "water_level")
self.assertEqual(result[0].records[1].get_time(),
datetime.datetime(1970, 1, 1, 2, 0, tzinfo=datetime.timezone.utc))

self.delete_test_bucket(bucket)

def test_write_data_frame_without_measurement_name(self):
from influxdb_client.extras import pd

bucket = self.create_test_bucket()

now = pd.Timestamp('1970-01-01 00:00+00:00')
data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
columns=["location", "water_level"])

with self.assertRaises(TypeError) as cm:
self.write_client.write(bucket.name, record=data_frame)
exception = cm.exception

self.assertEqual('"data_frame_measurement_name" is a Required Argument', exception.__str__())

def test_use_default_org(self):
bucket = self.create_test_bucket()

Expand Down Expand Up @@ -362,6 +414,44 @@ def test_use_default_tags_with_dictionaries(self):

self.delete_test_bucket(bucket)

def test_use_default_tags_with_data_frame(self):
from influxdb_client.extras import pd

bucket = self.create_test_bucket()

now = pd.Timestamp('1970-01-01 00:00+00:00')
data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
columns=["location", "water_level"])

self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

time.sleep(1)

query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'

flux_result = self.client.query_api().query(query)

self.assertEqual(1, len(flux_result))

records = flux_result[0].records

self.assertEqual(2, len(records))

rec = records[0]
rec2 = records[1]

self.assertEqual(self.id_tag, rec["id"])
self.assertEqual(self.customer_tag, rec["customer"])
self.assertEqual("LA", rec[self.data_center_key])

self.assertEqual(self.id_tag, rec2["id"])
self.assertEqual(self.customer_tag, rec2["customer"])
self.assertEqual("LA", rec2[self.data_center_key])

self.delete_test_bucket(bucket)

def test_write_bytes(self):
bucket = self.create_test_bucket()

Expand Down
27 changes: 27 additions & 0 deletions tests/test_WriteApiBatching.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,33 @@ def test_to_low_flush_interval(self):

httpretty.reset()

def test_batching_data_frame(self):
from influxdb_client.extras import pd

httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)

data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0],
["coyote_creek", 3.0], ["coyote_creek", 4.0]],
index=[1, 2, 3, 4],
columns=["location", "level water_level"])

self._write_client.write("my-bucket", "my-org", record=data_frame,
data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

time.sleep(1)

_requests = httpretty.httpretty.latest_requests

self.assertEqual(2, len(_requests))
_request1 = "h2o_feet,location=coyote_creek level\\ water_level=1.0 1\n" \
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2"
_request2 = "h2o_feet,location=coyote_creek level\\ water_level=3.0 3\n" \
"h2o_feet,location=coyote_creek level\\ water_level=4.0 4"

self.assertEqual(_request1, _requests[0].parsed_body)
self.assertEqual(_request2, _requests[1].parsed_body)

if __name__ == '__main__':
unittest.main()