Skip to content

Commit

Permalink
feat: Add possibility to specify DataFrame index (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar committed Nov 13, 2019
1 parent 9e96c58 commit 41ee594
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 620 deletions.
24 changes: 16 additions & 8 deletions influxdb_client/client/flux_csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import codecs
import csv as csv_parser
from enum import Enum
from typing import List

import ciso8601
from pandas import DataFrame
Expand All @@ -28,10 +29,12 @@ class FluxSerializationMode(Enum):

class FluxCsvParser(object):

def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode) -> None:
def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode,
data_frame_index: List[str] = None) -> None:
self._response = response
self.tables = []
self._serialization_mode = serialization_mode
self._data_frame_index = data_frame_index
pass

def __enter__(self):
Expand Down Expand Up @@ -74,8 +77,8 @@ def _parse_flux_response(self):
if "#datatype" == token:

# Return already parsed DataFrame
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_dataFrame'):
yield self._dataFrame
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
yield self._prepare_data_frame()

start_new_table = True
table = FluxTable()
Expand All @@ -101,9 +104,9 @@ def _parse_flux_response(self):
start_new_table = False
# Create DataFrame with default values
if self._serialization_mode is FluxSerializationMode.dataFrame:
self._dataFrame = DataFrame(data=[], columns=[], index=None)
self._data_frame = DataFrame(data=[], columns=[], index=None)
for column in table.columns:
self._dataFrame[column.label] = column.default_value
self._data_frame[column.label] = column.default_value
pass
continue

Expand All @@ -127,15 +130,20 @@ def _parse_flux_response(self):
yield flux_record

if self._serialization_mode is FluxSerializationMode.dataFrame:
self._dataFrame.loc[len(self._dataFrame.index)] = flux_record.values
self._data_frame.loc[len(self._data_frame.index)] = flux_record.values
pass

# debug
# print(flux_record)

# Return latest DataFrame
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_dataFrame'):
yield self._dataFrame
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
yield self._prepare_data_frame()

def _prepare_data_frame(self):
if self._data_frame_index:
self._data_frame = self._data_frame.set_index(self._data_frame_index)
return self._data_frame

def parse_record(self, table_index, table, csv):
record = FluxRecord(table_index)
Expand Down
8 changes: 5 additions & 3 deletions influxdb_client/client/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,14 @@ def query_stream(self, query: str, org=None) -> Generator['FluxRecord', Any, Non

return _parser.generator()

def query_data_frame(self, query: str, org=None):
def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None):
"""
Synchronously executes the Flux query and return Pandas DataFrame.
Note that if a query returns more then one table than the client generates a dataframe for each of them.
Note that if a query returns more then one table than the client generates a DataFrame for each of them.
:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param data_frame_index: the list of columns that are used as DataFrame index
:return:
"""
if org is None:
Expand All @@ -107,7 +108,8 @@ def query_data_frame(self, query: str, org=None):
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame)
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
data_frame_index=data_frame_index)
_dataFrames = list(_parser.generator())

if len(_dataFrames) == 1:
Expand Down
60 changes: 29 additions & 31 deletions notebooks/realtime-stream.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"outputs": [],
"source": [
"from datetime import timedelta\n",
"from typing import List\n",
"\n",
"import hvplot.streamz\n",
"import pandas as pd\n",
Expand All @@ -52,15 +53,15 @@
"metadata": {},
"outputs": [],
"source": [
"def source_data(auto_refresh: int, sink: Stream):\n",
"def source_data(auto_refresh: int, query: str, sink: Stream):\n",
" rx \\\n",
" .interval(period=timedelta(seconds=auto_refresh)) \\\n",
" .pipe(ops.map(lambda start: f'from(bucket: \"my-bucket\") '\n",
" f'|> range(start: -{auto_refresh}s, stop: now()) '\n",
" f'|> filter(fn: (r) => (r._measurement == \"cpu\") or (r._measurement == \"mem\")) ')) \\\n",
" .pipe(ops.map(lambda query: client.query_api().query_stream(query))) \\\n",
" .pipe(ops.flat_map(lambda records: rx.from_iterable(records))) \\\n",
" .subscribe(observer=lambda record: sink.emit(record), on_error=lambda error: print(error))\n",
" f'{query}')) \\\n",
" .pipe(ops.map(lambda query: client.query_api().query_data_frame(query, data_frame_index=['_time']))) \\\n",
" .pipe(ops.map(lambda data_frame: data_frame.drop(columns=['result', 'table']))) \\\n",
" .subscribe(observer=lambda data_frame: sink.emit(data_frame), on_error=lambda error: print(error))\n",
" pass"
]
},
Expand All @@ -70,10 +71,7 @@
"metadata": {},
"outputs": [],
"source": [
"client = InfluxDBClient(url=\"http://localhost:9999\", token=\"my-token\", org=\"my-org\", debug=False)\n",
"\n",
"sink = Stream()\n",
"source_data(auto_refresh=5, sink=sink)"
"client = InfluxDBClient(url='http://localhost:9999', token='my-token', org='my-org')"
]
},
{
Expand All @@ -82,12 +80,17 @@
"metadata": {},
"outputs": [],
"source": [
"cpu_example = pd.DataFrame({'value': []}, columns=['value'])\n",
"cpu_query = '|> filter(fn: (r) => r._measurement == \"cpu\") ' \\\n",
" '|> filter(fn: (r) => r._field == \"usage_user\") ' \\\n",
" '|> filter(fn: (r) => r.cpu == \"cpu-total\") ' \\\n",
" '|> keep(columns: [\"_time\", \"_value\"])'\n",
"\n",
"\n",
"cpu_sink = Stream()\n",
"cpu_example = pd.DataFrame({'_value': []}, columns=['_value'])\n",
"cpu_df = DataFrame(cpu_sink, example=cpu_example)\n",
"\n",
"cpu_sink = sink\\\n",
" .filter(lambda record: (record[\"_measurement\"] == \"cpu\") & (record[\"_field\"] == \"usage_user\"))\\\n",
" .map(lambda record: pd.DataFrame({'value': [record[\"_value\"]]}, columns=['value'], index=[record[\"_time\"]]))\n",
"cpu = DataFrame(cpu_sink, example=cpu_example)"
"source_data(auto_refresh=5, sink=cpu_sink, query=cpu_query)"
]
},
{
Expand All @@ -96,14 +99,17 @@
"metadata": {},
"outputs": [],
"source": [
"mem_example = pd.DataFrame({'field': [], 'value': []}, columns=['field', 'value'])\n",
"mem_query = '|> filter(fn: (r) => r._measurement == \"mem\") ' \\\n",
" '|> filter(fn: (r) => r._field == \"available\" or r._field == \"free\" or r._field == \"total\" or r._field == \"used\") ' \\\n",
" '|> map(fn: (r) => ({ r with _value: r._value / 1024 / 1024 }))' \\\n",
" '|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")' \\\n",
" '|> keep(columns: [\"_time\", \"used\", \"total\", \"free\", \"available\"])'\n",
"\n",
"mem_sink = sink \\\n",
" .filter(lambda record: record[\"_measurement\"] == \"mem\") \\\n",
" .filter(lambda record: record[\"_field\"] in [\"total\", \"used\", \"free\", \"available\"]) \\\n",
" .map(lambda record: pd.DataFrame({'field': record[\"_field\"], 'value': record[\"_value\"]},\n",
" columns=['field', 'value'], index=[record[\"_time\"], record[\"_field\"]]))\n",
"mem = DataFrame(mem_sink, example=mem_example)"
"mem_sink = Stream()\n",
"mem_example = pd.DataFrame({'used': [], 'total': [], 'free': [], 'available': []}, columns=['available', 'free', 'total', 'used'])\n",
"mem_df = DataFrame(mem_sink, example=mem_example)\n",
"\n",
"source_data(auto_refresh=5, sink=mem_sink, query=mem_query)"
]
},
{
Expand All @@ -130,16 +136,8 @@
" years=[\"%H:%M:%S\"],\n",
")\n",
"\n",
"cpu.hvplot(width=700, backlog=50, title='CPU % usage', xlabel='Time', ylabel='%', xformatter=formatter)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
" mem.groupby('field').sum().hvplot.bar()"
"cpu_df.hvplot(width=450, backlog=50, title='CPU % usage', xlabel='Time', ylabel='%', xformatter=formatter) +\\\n",
"mem_df.hvplot.line(width=450, backlog=50, title='Memory', xlabel='Time', ylabel='MiB', xformatter=formatter, legend='top_left')"
]
},
{
Expand Down
Loading

0 comments on commit 41ee594

Please sign in to comment.