Skip to content

Commit

Permalink
feat: Add query_data_frame_stream method (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaheicanaan authored Jun 30, 2020
1 parent 1bb088f commit 3e38699
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features
1. [#112](https://github.com/influxdata/influxdb-client-python/pull/113): Support timestamp with different timezone in _convert_timestamp
1. [#120](https://github.com/influxdata/influxdb-client-python/pull/120): ciso8601 is an optional dependency and has to be installed separably
1. [#121](https://github.com/influxdata/influxdb-client-python/pull/121): Added query_data_frame_stream method

### Bug Fixes
1. [#117](https://github.com/influxdata/influxdb-client-python/pull/117): Fixed appending default tags for single Point
Expand Down
30 changes: 22 additions & 8 deletions influxdb_client/client/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,27 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N

from ..extras import pd

_generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index)
_dataFrames = list(_generator)

if len(_dataFrames) == 0:
return pd.DataFrame(columns=[], index=None)
elif len(_dataFrames) == 1:
return _dataFrames[0]
else:
return _dataFrames

def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None):
"""
Synchronously executes the Flux query and return stream of Pandas DataFrame as a Generator['pd.DataFrame'].
Note that if a query returns more then one table than the client generates a DataFrame for each of them.
:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param data_frame_index: the list of columns that are used as DataFrame index
:return:
"""

if org is None:
org = self._influxdb_client.org

Expand All @@ -113,14 +134,7 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
data_frame_index=data_frame_index)
_dataFrames = list(_parser.generator())

if len(_dataFrames) == 0:
return pd.DataFrame(columns=[], index=None)
elif len(_dataFrames) == 1:
return _dataFrames[0]
else:
return _dataFrames
return _parser.generator()

# private helper for c
@staticmethod
Expand Down

0 comments on commit 3e38699

Please sign in to comment.