Skip to content

Commit

Permalink
feat: support for writing pandas DataFrame (#79) - default tags
Browse files Browse the repository at this point in the history
  • Loading branch information
rolincova committed Apr 29, 2020
1 parent 212cee3 commit aa49182
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 13 deletions.
30 changes: 17 additions & 13 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,42 +306,46 @@ def _write_batching(self, bucket, org, data,

return None

def _data_frame_to_list_of_points(self, dataframe, data_frame_measurement_name, data_frame_tag_columns, precision='s'):
def _data_frame_to_list_of_points(self, data_frame, data_frame_measurement_name, data_frame_tag_columns, precision):
from ..extras import pd
if not isinstance(dataframe, pd.DataFrame):
if not isinstance(data_frame, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(dataframe)))
if not (isinstance(dataframe.index, pd.PeriodIndex) or
isinstance(dataframe.index, pd.DatetimeIndex)):
.format(type(data_frame)))
if not (isinstance(data_frame.index, pd.PeriodIndex) or
isinstance(data_frame.index, pd.DatetimeIndex)):
raise TypeError('Must be DataFrame with DatetimeIndex or \
PeriodIndex.')

if isinstance(dataframe.index, pd.PeriodIndex):
dataframe.index = dataframe.index.to_timestamp()
if isinstance(data_frame.index, pd.PeriodIndex):
data_frame.index = data_frame.index.to_timestamp()
else:
dataframe.index = pd.to_datetime(dataframe.index)
data_frame.index = pd.to_datetime(data_frame.index)

if dataframe.index.tzinfo is None:
dataframe.index = dataframe.index.tz_localize('UTC')
if data_frame.index.tzinfo is None:
data_frame.index = data_frame.index.tz_localize('UTC')

data = []

c = 0
for v in dataframe.values:
for v in data_frame.values:
point = Point(measurement_name=data_frame_measurement_name)

count = 0
for f in v:
column = dataframe.columns[count]
column = data_frame.columns[count]
if data_frame_tag_columns and column in data_frame_tag_columns:
point.tag(column, f)
else:
point.field(column, f)
count += 1

point.time(dataframe.index[c], precision)
point.time(data_frame.index[c], precision)
c += 1

if self._point_settings.defaultTags:
for key, val in self._point_settings.defaultTags.items():
point.tag(key, val)

data.append(point)

return data
Expand Down
38 changes: 38 additions & 0 deletions tests/test_WriteApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,44 @@ def test_use_default_tags_with_dictionaries(self):

self.delete_test_bucket(bucket)

def test_use_default_tags_with_data_frame(self):
from influxdb_client.extras import pd

bucket = self.create_test_bucket()

now = pd.Timestamp('1970-01-01 00:00+00:00')
data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
columns=["location", "water_level"])

self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

time.sleep(1)

query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'

flux_result = self.client.query_api().query(query)

self.assertEqual(1, len(flux_result))

records = flux_result[0].records

self.assertEqual(2, len(records))

rec = records[0]
rec2 = records[1]

self.assertEqual(self.id_tag, rec["id"])
self.assertEqual(self.customer_tag, rec["customer"])
self.assertEqual("LA", rec[self.data_center_key])

self.assertEqual(self.id_tag, rec2["id"])
self.assertEqual(self.customer_tag, rec2["customer"])
self.assertEqual("LA", rec2[self.data_center_key])

self.delete_test_bucket(bucket)

def test_write_bytes(self):
bucket = self.create_test_bucket()

Expand Down

0 comments on commit aa49182

Please sign in to comment.