Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added possibility to use datetime nanoseconds precision by pandas.Timestamp #141

Merged
merged 3 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

### Features
1. [#136](https://github.com/influxdata/influxdb-client-python/pull/136): Allows users to skip of verifying SSL certificate
1. [#143](https://github.com/influxdata/influxdb-client-python/pull/143): Skip of verifying SSL certificate could be configured via config file or environment properties
1. [#143](https://github.com/influxdata/influxdb-client-python/pull/143): Skip of verifying SSL certificate could be configured via config file or environment properties
1. [#141](https://github.com/influxdata/influxdb-client-python/pull/141): Added possibility to use datetime nanoseconds precision by `pandas.Timestamp`

## 1.9.0 [2020-07-17]

Expand Down
68 changes: 67 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -899,10 +899,76 @@ The following forward compatible APIs are available:

For detail info see `InfluxDB 1.8 example <examples/influxdb_18_example.py>`_.

Nanosecond precision
^^^^^^^^^^^^^^^^^^^^

The Python's `datetime <https://docs.python.org/3/library/datetime.html>`_ doesn't support precision with nanoseconds
so the library during writes and queries ignores everything after microseconds.

If you would like to use ``datetime`` with nanosecond precision you should use
`pandas.Timestamp <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Timestamp.html#pandas.Timestamp>`_
that is replacement for python ``datetime.datetime`` object and also you should set a proper ``DateTimeHelper`` to the client.

* sources - `nanosecond_precision.py <https://github.com/influxdata/influxdb-client-python/blob/master/examples/nanosecond_precision.py>`_

.. code-block:: python

from influxdb_client import Point, InfluxDBClient
from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper
from influxdb_client.client.write_api import SYNCHRONOUS

"""
Set PandasDate helper which supports nanoseconds.
"""
import influxdb_client.client.util.date_utils as date_utils

date_utils.date_helper = PandasDateTimeHelper()

"""
Prepare client.
"""
client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

"""
Prepare data
"""

point = Point("h2o_feet") \
.field("water_level", 10) \
.tag("location", "pacific") \
.time('1996-02-25T21:20:00.001001231Z')

print(f'Time serialized with nanosecond precision: {point.to_line_protocol()}')
print()

write_api.write(bucket="my-bucket", record=point)

"""
Query: using Stream
"""
query = '''
from(bucket:"my-bucket")
|> range(start: 0, stop: now())
|> filter(fn: (r) => r._measurement == "h2o_feet")
'''
records = query_api.query_stream(query)

for record in records:
print(f'Temperature in {record["location"]} is {record["_value"]} at time: {record["_time"]}')

"""
Close client
"""
client.__del__()


Local tests
-----------

.. code-block:: python
.. code-block:: console

# start/restart InfluxDB2 on local machine using docker
./scripts/influxdb-restart.sh
Expand Down
50 changes: 50 additions & 0 deletions examples/nanosecond_precision.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from influxdb_client import Point, InfluxDBClient
from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper
from influxdb_client.client.write_api import SYNCHRONOUS

"""
Set PandasDate helper which supports nanoseconds.
"""
import influxdb_client.client.util.date_utils as date_utils

date_utils.date_helper = PandasDateTimeHelper()

"""
Prepare client.
"""
client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

"""
Prepare data
"""

point = Point("h2o_feet") \
.field("water_level", 10) \
.tag("location", "pacific") \
.time('1996-02-25T21:20:00.001001231Z')

print(f'Time serialized with nanosecond precision: {point.to_line_protocol()}')
print()

write_api.write(bucket="my-bucket", record=point)

"""
Query: using Stream
"""
query = '''
from(bucket:"my-bucket")
|> range(start: 0, stop: now())
|> filter(fn: (r) => r._measurement == "h2o_feet")
'''
records = query_api.query_stream(query)

for record in records:
print(f'Temperature in {record["location"]} is {record["_value"]} at time: {record["_time"]}')

"""
Close client
"""
client.__del__()
18 changes: 0 additions & 18 deletions influxdb_client/client/date_utils.py

This file was deleted.

7 changes: 2 additions & 5 deletions influxdb_client/client/flux_csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from urllib3 import HTTPResponse

from influxdb_client.client.date_utils import get_date_parse_function
from influxdb_client.client.util.date_utils import get_date_helper
from influxdb_client.client.flux_table import FluxTable, FluxColumn, FluxRecord


Expand Down Expand Up @@ -208,10 +208,7 @@ def _to_value(self, str_val, column):
return base64.b64decode(str_val)

if "dateTime:RFC3339" == column.data_type or "dateTime:RFC3339Nano" == column.data_type:
# todo nanosecods precision
# return str_val
return get_date_parse_function()(str_val)
# return timestamp_parser(str_val)
return get_date_helper().parse_date(str_val)

if "duration" == column.data_type:
# todo better type ?
Expand Down
1 change: 1 addition & 0 deletions influxdb_client/client/util/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Utils package."""
49 changes: 49 additions & 0 deletions influxdb_client/client/util/date_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Utils to get right Date parsing function."""

from dateutil import parser

date_helper = None


class DateHelper:
"""DateHelper to groups different implementations of date operations."""

def parse_date(self, date_string: str):
"""
Parse string into Date or Timestamp.

:return: Returns a :class:`datetime.datetime` object or compliant implementation
like :class:`class 'pandas._libs.tslibs.timestamps.Timestamp`
"""
pass

def to_nanoseconds(self, delta):
"""
Get number of nanoseconds in timedelta.

Solution comes from v1 client. Thx.
https://github.com/influxdata/influxdb-python/pull/811
"""
nanoseconds_in_days = delta.days * 86400 * 10 ** 9
nanoseconds_in_seconds = delta.seconds * 10 ** 9
nanoseconds_in_micros = delta.microseconds * 10 ** 3

return nanoseconds_in_days + nanoseconds_in_seconds + nanoseconds_in_micros


def get_date_helper() -> DateHelper:
"""
Return DateHelper with proper implementation.

If there is a 'ciso8601' than use 'ciso8601.parse_datetime' else use 'dateutil.parse'.
"""
global date_helper
if date_helper is None:
date_helper = DateHelper()
try:
import ciso8601
date_helper.parse_date = ciso8601.parse_datetime
except ModuleNotFoundError:
date_helper.parse_date = parser.parse

return date_helper
15 changes: 15 additions & 0 deletions influxdb_client/client/util/date_utils_pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Pandas date utils."""
from influxdb_client.client.util.date_utils import DateHelper
from influxdb_client.extras import pd


class PandasDateTimeHelper(DateHelper):
"""DateHelper that use Pandas library with nanosecond precision."""

def parse_date(self, date_string: str):
"""Parse date string into `class 'pandas._libs.tslibs.timestamps.Timestamp`."""
return pd.to_datetime(date_string)

def to_nanoseconds(self, delta):
"""Get number of nanoseconds with nanos precision."""
return super().to_nanoseconds(delta) + (delta.nanoseconds if hasattr(delta, 'nanoseconds') else 0)
19 changes: 4 additions & 15 deletions influxdb_client/client/write/point.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pytz import UTC
from six import iteritems

from influxdb_client.client.date_utils import get_date_parse_function
from influxdb_client.client.util.date_utils import get_date_helper
from influxdb_client.domain.write_precision import WritePrecision

EPOCH = UTC.localize(datetime.utcfromtimestamp(0))
Expand Down Expand Up @@ -164,24 +164,13 @@ def _escape_string(value):
return str(value).translate(_ESCAPE_STRING)


def _to_nanoseconds(delta):
"""
Solution comes from v1 client. Thx.

https://github.com/influxdata/influxdb-python/pull/811
"""
nanoseconds_in_days = delta.days * 86400 * 10 ** 9
nanoseconds_in_seconds = delta.seconds * 10 ** 9
nanoseconds_in_micros = delta.microseconds * 10 ** 3
return nanoseconds_in_days + nanoseconds_in_seconds + nanoseconds_in_micros


def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION):
date_helper = get_date_helper()
if isinstance(timestamp, Integral):
return timestamp # assume precision is correct if timestamp is int

if isinstance(timestamp, str):
timestamp = get_date_parse_function()(timestamp)
timestamp = date_helper.parse_date(timestamp)

if isinstance(timestamp, timedelta) or isinstance(timestamp, datetime):

Expand All @@ -192,7 +181,7 @@ def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION):
timestamp = timestamp.astimezone(UTC)
timestamp = timestamp - EPOCH

ns = _to_nanoseconds(timestamp)
ns = date_helper.to_nanoseconds(timestamp)

if precision is None or precision == WritePrecision.NS:
return ns
Expand Down
35 changes: 35 additions & 0 deletions tests/test_PandasDateTimeHelper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import unittest
from datetime import datetime, timedelta

from pytz import UTC

from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper


class PandasDateTimeHelperTest(unittest.TestCase):

def setUp(self) -> None:
self.helper = PandasDateTimeHelper()

def test_parse_date(self):
date = self.helper.parse_date('2020-08-07T06:21:57.331249158Z')

self.assertEqual(date.year, 2020)
self.assertEqual(date.month, 8)
self.assertEqual(date.day, 7)
self.assertEqual(date.hour, 6)
self.assertEqual(date.minute, 21)
self.assertEqual(date.second, 57)
self.assertEqual(date.microsecond, 331249)
self.assertEqual(date.nanosecond, 158)

def test_to_nanoseconds(self):
date = self.helper.parse_date('2020-08-07T06:21:57.331249158Z')
nanoseconds = self.helper.to_nanoseconds(date - UTC.localize(datetime.utcfromtimestamp(0)))

self.assertEqual(nanoseconds, 1596781317331249158)

def test_to_nanoseconds_buildin_timedelta(self):
nanoseconds = self.helper.to_nanoseconds(timedelta(days=1))

self.assertEqual(nanoseconds, 86400000000000)
36 changes: 36 additions & 0 deletions tests/test_WriteApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,42 @@ def test_check_write_permission_by_empty_data(self):

client.__del__()

def test_write_query_data_nanoseconds(self):

from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper
import influxdb_client.client.util.date_utils as date_utils

date_utils.date_helper = PandasDateTimeHelper()

bucket = self.create_test_bucket()

point = Point("h2o_feet") \
.field("water_level", 155) \
.tag("location", "creek level")\
.time('1996-02-25T21:20:00.001001231Z')

self.write_client.write(bucket.name, self.org, [point])

flux_result = self.client.query_api().query(
f'from(bucket:"{bucket.name}") |> range(start: 1970-01-01T00:00:00.000000001Z)')
self.assertEqual(1, len(flux_result))

record = flux_result[0].records[0]

self.assertEqual(self.id_tag, record["id"])
self.assertEqual(record["_value"], 155)
self.assertEqual(record["location"], "creek level")
self.assertEqual(record["_time"].year, 1996)
self.assertEqual(record["_time"].month, 2)
self.assertEqual(record["_time"].day, 25)
self.assertEqual(record["_time"].hour, 21)
self.assertEqual(record["_time"].minute, 20)
self.assertEqual(record["_time"].second, 00)
self.assertEqual(record["_time"].microsecond, 1001)
self.assertEqual(record["_time"].nanosecond, 231)

date_utils.date_helper = None


class AsynchronousWriteTest(BaseTest):

Expand Down