Skip to content

Commit

Permalink
feat: bind query parameters (#219)
Browse files Browse the repository at this point in the history
  • Loading branch information
rhajek committed Apr 1, 2021
1 parent 8df7b18 commit 9184914
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 22 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.17.0 [unreleased]

### Features
1. [#203](https://github.com/influxdata/influxdb-client-python/issues/219): Bind query parameters

## 1.16.0 [2021-04-01]

### Features
Expand Down
35 changes: 32 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,10 @@ Queries
The result retrieved by `QueryApi <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/query_api.py>`_ could be formatted as a:

1. Flux data structure: `FluxTable <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L5>`_, `FluxColumn <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L22>`_ and `FluxRecord <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L31>`_
2. `csv.reader <https://docs.python.org/3.4/library/csv.html#reader-objects>`__ which will iterate over CSV lines
3. Raw unprocessed results as a ``str`` iterator
4. `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
2. Query bind parameters
3. `csv.reader <https://docs.python.org/3.4/library/csv.html#reader-objects>`__ which will iterate over CSV lines
4. Raw unprocessed results as a ``str`` iterator
5. `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_

The API also support streaming ``FluxRecord`` via `query_stream <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/query_api.py#L77>`_, see example below:

Expand Down Expand Up @@ -502,6 +503,34 @@ The API also support streaming ``FluxRecord`` via `query_stream <https://github.
print()
print()
"""
Query: using Bind parameters
"""
p = {"_start": datetime.timedelta(hours=-1),
"_location": "Prague",
"_desc": True,
"_floatParam": 25.1,
"_every": datetime.timedelta(minutes=5)
}
tables = query_api.query('''
from(bucket:"my-bucket") |> range(start: _start)
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
|> filter(fn: (r) => r["_field"] == "temperature")
|> filter(fn: (r) => r["location"] == _location and r["_value"] > _floatParam)
|> aggregateWindow(every: _every, fn: mean, createEmpty: true)
|> sort(columns: ["_time"], desc: _desc)
''', params=p)
for table in tables:
print(table)
for record in table.records:
print(str(record["_time"]) + " - " + record["location"] + ": " + str(record["_value"]))
print()
print()
"""
Query: using Stream
"""
Expand Down
43 changes: 38 additions & 5 deletions examples/query.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import datetime as datetime

from influxdb_client import InfluxDBClient, Point, Dialect
from influxdb_client.client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org",debug=True)

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
Expand All @@ -28,6 +30,34 @@
print()
print()

"""
Query: using Bind parameters
"""

p = {"_start": datetime.timedelta(hours=-1),
"_location": "Prague",
"_desc": True,
"_floatParam": 25.1,
"_every": datetime.timedelta(minutes=5)
}

tables = query_api.query('''
from(bucket:"my-bucket") |> range(start: _start)
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
|> filter(fn: (r) => r["_field"] == "temperature")
|> filter(fn: (r) => r["location"] == _location and r["_value"] > _floatParam)
|> aggregateWindow(every: _every, fn: mean, createEmpty: true)
|> sort(columns: ["_time"], desc: _desc)
''', params=p)

for table in tables:
print(table)
for record in table.records:
print(str(record["_time"]) + " - " + record["location"] + ": " + str(record["_value"]))

print()
print()

"""
Query: using Stream
"""
Expand Down Expand Up @@ -66,10 +96,13 @@
"""
Query: using Pandas DataFrame
"""
data_frame = query_api.query_data_frame('from(bucket:"my-bucket") '
'|> range(start: -10m) '
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") '
'|> keep(columns: ["location", "temperature"])')
data_frame = query_api.query_data_frame('''
from(bucket:"my-bucket")
|> range(start: -10m)
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> keep(columns: ["_time","location", "temperature"])
''')
print(data_frame.to_string())

"""
Expand Down
81 changes: 67 additions & 14 deletions influxdb_client/client/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

import codecs
import csv
from datetime import datetime, timedelta
from typing import List, Generator, Any
from pytz import UTC

from influxdb_client import Dialect
from influxdb_client import Dialect, IntegerLiteral, BooleanLiteral, FloatLiteral, DateTimeLiteral, StringLiteral, \
VariableAssignment, Identifier, OptionStatement, File, DurationLiteral, Duration, UnaryExpression
from influxdb_client import Query, QueryService
from influxdb_client.client.flux_csv_parser import FluxCsvParser, FluxSerializationMode
from influxdb_client.client.flux_table import FluxTable, FluxRecord
Expand All @@ -29,51 +32,54 @@ def __init__(self, influxdb_client):
self._influxdb_client = influxdb_client
self._query_api = QueryService(influxdb_client.api_client)

def query_csv(self, query: str, org=None, dialect: Dialect = default_dialect):
def query_csv(self, query: str, org=None, dialect: Dialect = default_dialect, params: dict = None):
"""
Execute the Flux query and return results as a CSV iterator. Each iteration returns a row of the CSV file.
:param query: a Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param dialect: csv dialect format
:param params: bind parameters
:return: The returned object is an iterator. Each iteration returns a row of the CSV file
(which can span multiple input lines).
"""
if org is None:
org = self._influxdb_client.org
response = self._query_api.post_query(org=org, query=self._create_query(query, dialect), async_req=False,
response = self._query_api.post_query(org=org, query=self._create_query(query, dialect, params), async_req=False,
_preload_content=False)

return csv.reader(codecs.iterdecode(response, 'utf-8'))

def query_raw(self, query: str, org=None, dialect=default_dialect):
def query_raw(self, query: str, org=None, dialect=default_dialect, params: dict = None):
"""
Execute synchronous Flux query and return result as raw unprocessed result as a str.
:param query: a Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param dialect: csv dialect format
:param params: bind parameters
:return: str
"""
if org is None:
org = self._influxdb_client.org
result = self._query_api.post_query(org=org, query=self._create_query(query, dialect), async_req=False,
result = self._query_api.post_query(org=org, query=self._create_query(query, dialect, params), async_req=False,
_preload_content=False)

return result

def query(self, query: str, org=None) -> List['FluxTable']:
def query(self, query: str, org=None, params: dict = None) -> List['FluxTable']:
"""
Execute synchronous Flux query and return result as a List['FluxTable'].
:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param params: bind parameters
:return:
"""
if org is None:
org = self._influxdb_client.org

response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables)
Expand All @@ -82,12 +88,14 @@ def query(self, query: str, org=None) -> List['FluxTable']:

return _parser.tables

def query_stream(self, query: str, org=None) -> Generator['FluxRecord', Any, None]:
def query_stream(self, query: str, org=None, params: dict = None) -> Generator['FluxRecord', Any, None]:
"""
Execute synchronous Flux query and return stream of FluxRecord as a Generator['FluxRecord'].
:param query: the Flux query
:param params: the Flux query parameters
:param org: organization name (optional if already specified in InfluxDBClient)
:param params: bind parameters
:return:
"""
if org is None:
Expand All @@ -100,7 +108,7 @@ def query_stream(self, query: str, org=None) -> Generator['FluxRecord', Any, Non

return _parser.generator()

def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None):
def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None):
"""
Execute synchronous Flux query and return Pandas DataFrame.
Expand All @@ -109,11 +117,12 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N
:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param data_frame_index: the list of columns that are used as DataFrame index
:param params: bind parameters
:return:
"""
from ..extras import pd

_generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index)
_generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index, params=params)
_dataFrames = list(_generator)

if len(_dataFrames) == 0:
Expand All @@ -123,7 +132,7 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N
else:
return _dataFrames

def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None):
def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None):
"""
Execute synchronous Flux query and return stream of Pandas DataFrame as a Generator['pd.DataFrame'].
Expand All @@ -132,12 +141,13 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s
:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param data_frame_index: the list of columns that are used as DataFrame index
:param params: bind parameters
:return:
"""
if org is None:
org = self._influxdb_client.org

response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
Expand All @@ -146,10 +156,53 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s

# private helper for c
@staticmethod
def _create_query(query, dialect=default_dialect):
created = Query(query=query, dialect=dialect)
def _create_query(query, dialect=default_dialect, params: dict = None):
created = Query(query=query, dialect=dialect, extern=QueryApi._build_flux_ast(params))
return created

@staticmethod
def _params_to_extern_ast(params: dict) -> List['OptionStatement']:

statements = []
for key, value in params.items():

if isinstance(value, bool):
literal = BooleanLiteral("BooleanLiteral", value)
elif isinstance(value, int):
literal = IntegerLiteral("IntegerLiteral", str(value))
elif isinstance(value, float):
literal = FloatLiteral("FloatLiteral", value)
elif isinstance(value, datetime):
if not value.tzinfo:
value = UTC.localize(value)
else:
value = value.astimezone(UTC)
literal = DateTimeLiteral("DateTimeLiteral", value.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))
elif isinstance(value, timedelta):
# convert to microsecodns
_micro_delta = int(value / timedelta(microseconds=1))
if _micro_delta < 0:
literal = UnaryExpression("UnaryExpression", argument=DurationLiteral("DurationLiteral", [
Duration(magnitude=-_micro_delta, unit="us")]), operator="-")
else:
literal = DurationLiteral("DurationLiteral", [Duration(magnitude=_micro_delta, unit="us")])
elif isinstance(value, str):
literal = StringLiteral("StringLiteral", str(value))
else:
literal = value

statements.append(OptionStatement("OptionStatement",
VariableAssignment("VariableAssignment", Identifier("Identifier", key),
literal)))
return statements

@staticmethod
def _build_flux_ast(params: dict = None):
if params is None:
return None

return File(package=None, name=None, type=None, imports=[], body=QueryApi._params_to_extern_ast(params))

def __del__(self):
"""Close QueryAPI."""
pass
Loading

0 comments on commit 9184914

Please sign in to comment.