Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: writes uses precision from Point instead a default precision #108

Merged
merged 3 commits into from
Jun 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,5 @@ sandbox

# OpenAPI-generator
/.openapi-generator*
/tests/writer.pickle
**/writer.pickle
/tests/data_frame_file.csv
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

### Bug Fixes
1. [#105](https://github.com/influxdata/influxdb-client-python/pull/105): Fixed mapping dictionary without timestamp and tags into LineProtocol
1. [#108](https://github.com/influxdata/influxdb-client-python/pull/108): The WriteApi uses precision from Point instead a default precision

## 1.7.0 [2020-05-15]

Expand Down
4 changes: 4 additions & 0 deletions influxdb_client/client/write/point.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ def to_line_protocol(self):

return f"{_measurement}{_tags}{_fields}{_time}"

@property
def write_precision(self):
return self._write_precision


def _append_tags(tags):
_return = []
Expand Down
49 changes: 26 additions & 23 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
# coding: utf-8
import logging
import os
import re
from collections import defaultdict
from datetime import timedelta
from enum import Enum
from functools import reduce
from itertools import chain
from random import random
from time import sleep
from typing import Union, List
from typing import Union, List, Any

import rx
from rx import operators as ops, Observable
Expand Down Expand Up @@ -186,13 +184,13 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()
def write(self, bucket: str, org: str = None,
record: Union[
str, List['str'], Point, List['Point'], dict, List['dict'], bytes, List['bytes'], Observable] = None,
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> None:
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> Any:
"""
Writes time-series data into influxdb.

:param str org: specifies the destination organization for writes; take either the ID or Name interchangeably; if both orgID and org are specified, org takes precedence. (required)
:param str bucket: specifies the destination bucket for writes (required)
:param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol
:param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write.
:param record: Points, line protocol, Pandas DataFrame, RxPY Observable to write
:param data_frame_measurement_name: name of measurement for writing Pandas DataFrame
:param data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
Expand All @@ -217,11 +215,21 @@ def write(self, bucket: str, org: str = None,
return self._write_batching(bucket, org, record,
write_precision, **kwargs)

final_string = self._serialize(record, write_precision, **kwargs)
payloads = defaultdict(list)
self._serialize(record, write_precision, payloads, **kwargs)

_async_req = True if self._write_options.write_type == WriteType.asynchronous else False

return self._post_write(_async_req, bucket, org, final_string, write_precision)
def write_payload(payload):
final_string = b'\n'.join(payload[1])
return self._post_write(_async_req, bucket, org, final_string, payload[0])

results = list(map(write_payload, payloads.items()))
if not _async_req:
return None
elif len(results) == 1:
return results[0]
return results

def flush(self):
# TODO
Expand All @@ -241,44 +249,39 @@ def __del__(self):
self._disposable = None
pass

def _serialize(self, record, write_precision, **kwargs) -> bytes:
_result = b''
def _serialize(self, record, write_precision, payload, **kwargs):
if isinstance(record, bytes):
_result = record
payload[write_precision].append(record)

elif isinstance(record, str):
_result = record.encode("utf-8")
self._serialize(record.encode("utf-8"), write_precision, payload, **kwargs)

elif isinstance(record, Point):
_result = self._serialize(record.to_line_protocol(), write_precision, **kwargs)
self._serialize(record.to_line_protocol(), record.write_precision, payload, **kwargs)

elif isinstance(record, dict):
_result = self._serialize(Point.from_dict(record, write_precision=write_precision),
write_precision, **kwargs)
self._serialize(Point.from_dict(record, write_precision=write_precision), write_precision, payload, **kwargs)
elif 'DataFrame' in type(record).__name__:
_data = self._data_frame_to_list_of_points(record, precision=write_precision, **kwargs)
_result = self._serialize(_data, write_precision, **kwargs)
self._serialize(_data, write_precision, payload, **kwargs)

elif isinstance(record, list):
_result = b'\n'.join([self._serialize(item, write_precision,
**kwargs) for item in record])

return _result
for item in record:
self._serialize(item, write_precision, payload, **kwargs)

def _write_batching(self, bucket, org, data,
precision=DEFAULT_WRITE_PRECISION,
**kwargs):
_key = _BatchItemKey(bucket, org, precision)
if isinstance(data, bytes):
_key = _BatchItemKey(bucket, org, precision)
self._subject.on_next(_BatchItem(key=_key, data=data))

elif isinstance(data, str):
self._write_batching(bucket, org, data.encode("utf-8"),
precision, **kwargs)

elif isinstance(data, Point):
self._write_batching(bucket, org, data.to_line_protocol(),
precision, **kwargs)
self._write_batching(bucket, org, data.to_line_protocol(), data.write_precision, **kwargs)

elif isinstance(data, dict):
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision),
Expand Down
69 changes: 64 additions & 5 deletions tests/test_WriteApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def test_write_data_frame(self):
data_frame_tag_columns=['location'])

result = self.query_api.query(
"from(bucket:\"" + bucket.name + "\") |> range(start: 1970-01-01T00:00:00.000000001Z)", self.org)
f"from(bucket:\"{bucket.name}\") |> range(start: 1970-01-01T00:00:00.000000001Z)", self.org)

self.assertEqual(1, len(result))
self.assertEqual(2, len(result[0].records))
Expand Down Expand Up @@ -295,7 +295,7 @@ def test_write_empty_data(self):
bucket = self.create_test_bucket()

with self.assertRaises(ApiException) as cm:
self.write_client.write(bucket.name)
self.write_client.write(bucket.name, record="")
exception = cm.exception

self.assertEqual(400, exception.status)
Expand All @@ -306,6 +306,34 @@ def test_write_empty_data(self):

self.assertEqual(len(result), 0)

def test_write_point_different_precision(self):
bucket = self.create_test_bucket()

point1 = Point('test_precision') \
.field('power', 10) \
.tag('powerFlow', 'low') \
.time(datetime.datetime(2020, 4, 20, 6, 30, tzinfo=datetime.timezone.utc), WritePrecision.S)

point2 = Point('test_precision') \
.field('power', 20) \
.tag('powerFlow', 'high') \
.time(datetime.datetime(2020, 4, 20, 5, 30, tzinfo=datetime.timezone.utc), WritePrecision.MS)

writer = self.client.write_api(write_options=SYNCHRONOUS)
writer.write(bucket.name, self.org, [point1, point2])

result = self.query_api.query(
f"from(bucket:\"{bucket.name}\") |> range(start: 1970-01-01T00:00:00.000000001Z) |> last() "
"|> sort(columns: [\"_time\"], desc: false)", self.org)

self.assertEqual(len(result), 2)
self.assertEqual(len(result[0].records), 1)
self.assertEqual(len(result[1].records), 1)
self.assertEqual(result[0].records[0].get_time(),
datetime.datetime(2020, 4, 20, 5, 30, tzinfo=datetime.timezone.utc))
self.assertEqual(result[1].records[0].get_time(),
datetime.datetime(2020, 4, 20, 6, 30, tzinfo=datetime.timezone.utc))


class AsynchronousWriteTest(BaseTest):

Expand Down Expand Up @@ -381,9 +409,9 @@ def test_use_default_tags_with_dictionaries(self):
bucket = self.create_test_bucket()

_point1 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"time": "2009-11-10T22:00:00Z", "fields": {"water_level": 1.0}}
"time": "2009-11-10T22:00:00Z", "fields": {"water_level": 1.0}}
_point2 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"time": "2009-11-10T23:00:00Z", "fields": {"water_level": 2.0}}
"time": "2009-11-10T23:00:00Z", "fields": {"water_level": 2.0}}

_point_list = [_point1, _point2]

Expand Down Expand Up @@ -424,7 +452,7 @@ def test_use_default_tags_with_data_frame(self):
columns=["location", "water_level"])

async_result = self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])
data_frame_tag_columns=['location'])
async_result.get()

query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'
Expand Down Expand Up @@ -487,6 +515,37 @@ def test_write_bytes(self):

self.delete_test_bucket(bucket)

def test_write_point_different_precision(self):
bucket = self.create_test_bucket()

_point1 = Point("h2o_feet").tag("location", "coyote_creek").field("level water_level", 5.0).time(5,
WritePrecision.S)
_point2 = Point("h2o_feet").tag("location", "coyote_creek").field("level water_level", 6.0).time(6,
WritePrecision.US)

_point_list = [_point1, _point2]

async_results = self.write_client.write(bucket.name, self.org, _point_list)
self.assertEqual(2, len(async_results))
for async_result in async_results:
async_result.get()

query = f'from(bucket:"{bucket.name}") |> range(start: 1970-01-01T00:00:00.000000001Z) '\
'|> sort(columns: [\"_time\"], desc: false)'

flux_result = self.client.query_api().query(query)

self.assertEqual(1, len(flux_result))

records = flux_result[0].records

self.assertEqual(2, len(records))
self.assertEqual(records[0].get_time(),
datetime.datetime(1970, 1, 1, 0, 0, 0, 6, tzinfo=datetime.timezone.utc))
self.assertEqual(records[1].get_time(),
datetime.datetime(1970, 1, 1, 0, 0, 5, tzinfo=datetime.timezone.utc))



class PointSettingTest(BaseTest):

Expand Down
19 changes: 19 additions & 0 deletions tests/test_WriteApiBatching.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,25 @@ def test_record_types(self):

pass

def test_write_point_different_precision(self):
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)

_point1 = Point("h2o_feet").tag("location", "coyote_creek").field("level water_level", 5.0).time(5, WritePrecision.S)
_point2 = Point("h2o_feet").tag("location", "coyote_creek").field("level water_level", 6.0).time(6, WritePrecision.NS)

self._write_client.write("my-bucket", "my-org", [_point1, _point2])

time.sleep(1)

_requests = httpretty.httpretty.latest_requests

self.assertEqual(2, len(_requests))

self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=5.0 5", _requests[0].parsed_body)
self.assertEqual("s", _requests[0].querystring["precision"][0])
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=6.0 6", _requests[1].parsed_body)
self.assertEqual("ns", _requests[1].querystring["precision"][0])

def test_write_result(self):
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)

Expand Down
2 changes: 1 addition & 1 deletion tests/test_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def test_TimePrecisionDefault(self):
.tag("location", "europe") \
.field("level", 2)

self.assertEqual(WritePrecision.NS, point._write_precision)
self.assertEqual(WritePrecision.NS, point.write_precision)

def test_TimeSpanFormatting(self):
point = Point.measurement("h2o") \
Expand Down