Skip to content

Commit

Permalink
fix: add support for "with .. as .." statement for cleaner exception … (
Browse files Browse the repository at this point in the history
  • Loading branch information
rhajek authored Apr 13, 2021
1 parent d465dda commit 4792fd5
Show file tree
Hide file tree
Showing 15 changed files with 504 additions and 502 deletions.
130 changes: 63 additions & 67 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ Writes
The `WriteApi <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write_api.py>`_ supports synchronous, asynchronous and batching writes into InfluxDB 2.0.
The data should be passed as a `InfluxDB Line Protocol <https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_tutorial/>`_\ , `Data Point <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py>`_ or Observable stream.

**Important: The WriteApi in batching mode (default mode) is suppose to run as a singleton. To flush all your data you should call ``_write_client.close()`` at the end of your script.**
**Important: The WriteApi in batching mode (default mode) is suppose to run as a singleton. To flush all your data you should wrap the execution using ``with client.write_api(...) as write_api:`` statement or call ``_write_client.close()`` at the end of your script.**

*The default instance of WriteApi use batching.*

Expand Down Expand Up @@ -280,73 +280,69 @@ The batching is configurable by ``write_options``\ :
from influxdb_client import InfluxDBClient, Point, WriteOptions
_client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
_write_client = _client.write_api(write_options=WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2))
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as _client:
with _client.write_api(write_options=WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)) as _write_client:
"""
Write Line Protocol formatted as string
"""
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1")
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2",
"h2o_feet,location=coyote_creek water_level=3.0 3"])
"""
Write Line Protocol formatted as byte array
"""
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1".encode())
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2".encode(),
"h2o_feet,location=coyote_creek water_level=3.0 3".encode()])
"""
Write Dictionary-style object
"""
_write_client.write("my-bucket", "my-org", {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 1.0}, "time": 1})
_write_client.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 2.0}, "time": 2},
{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 3.0}, "time": 3}])
"""
Write Data Point
"""
_write_client.write("my-bucket", "my-org",
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 4.0).time(4))
_write_client.write("my-bucket", "my-org",
[Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 5.0).time(5),
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 6.0).time(6)])
"""
Write Observable stream
"""
_data = rx \
.range(7, 11) \
.pipe(ops.map(lambda i: "h2o_feet,location=coyote_creek water_level={0}.0 {0}".format(i)))
_write_client.write("my-bucket", "my-org", _data)
"""
Write Pandas DataFrame
"""
_now = datetime.now(UTC)
_data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[_now, _now + timedelta(hours=1)],
columns=["location", "water_level"])
_write_client.write("my-bucket", "my-org", record=_data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])
"""
Write Line Protocol formatted as string
"""
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1")
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2",
"h2o_feet,location=coyote_creek water_level=3.0 3"])
"""
Write Line Protocol formatted as byte array
"""
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1".encode())
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2".encode(),
"h2o_feet,location=coyote_creek water_level=3.0 3".encode()])
"""
Write Dictionary-style object
"""
_write_client.write("my-bucket", "my-org", {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 1.0}, "time": 1})
_write_client.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 2.0}, "time": 2},
{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 3.0}, "time": 3}])
"""
Write Data Point
"""
_write_client.write("my-bucket", "my-org",
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 4.0).time(4))
_write_client.write("my-bucket", "my-org",
[Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 5.0).time(5),
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 6.0).time(6)])
"""
Write Observable stream
"""
_data = rx \
.range(7, 11) \
.pipe(ops.map(lambda i: "h2o_feet,location=coyote_creek water_level={0}.0 {0}".format(i)))
_write_client.write("my-bucket", "my-org", _data)
"""
Write Pandas DataFrame
"""
_now = datetime.now(UTC)
_data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[_now, _now + timedelta(hours=1)],
columns=["location", "water_level"])
_write_client.write("my-bucket", "my-org", record=_data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])
"""
Close client
"""
_write_client.close()
_client.close()
Default Tags
Expand Down
69 changes: 34 additions & 35 deletions examples/buckets_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,38 @@
url = "http://localhost:8086"
token = "my-token"

client = InfluxDBClient(url=url, token=token)
buckets_api = client.buckets_api()
with InfluxDBClient(url=url, token=token) as client:
buckets_api = client.buckets_api()

"""
The Bucket API uses as a parameter the Organization ID. We have to retrieve ID by Organization API.
"""
org_name = "my-org"
org = list(filter(lambda it: it.name == org_name, client.organizations_api().find_organizations()))[0]

"""
Create Bucket with retention policy set to 3600 seconds and name "bucket-by-python"
"""
print(f"------- Create -------\n")
retention_rules = BucketRetentionRules(type="expire", every_seconds=3600)
created_bucket = buckets_api.create_bucket(bucket_name="bucket-by-python",
retention_rules=retention_rules,
org_id=org.id)
print(created_bucket)

"""
List all Buckets
"""
print(f"\n------- List -------\n")
buckets = buckets_api.find_buckets().buckets
print("\n".join([f" ---\n ID: {bucket.id}\n Name: {bucket.name}\n Retention: {bucket.retention_rules}"
for bucket in buckets]))
print("---")

"""
Delete previously created bucket
"""
print(f"------- Delete -------\n")
buckets_api.delete_bucket(created_bucket)
print(f" successfully deleted bucket: {created_bucket.name}")

"""
The Bucket API uses as a parameter the Organization ID. We have to retrieve ID by Organization API.
"""
org_name = "my-org"
org = list(filter(lambda it: it.name == org_name, client.organizations_api().find_organizations()))[0]

"""
Create Bucket with retention policy set to 3600 seconds and name "bucket-by-python"
"""
print(f"------- Create -------\n")
retention_rules = BucketRetentionRules(type="expire", every_seconds=3600)
created_bucket = buckets_api.create_bucket(bucket_name="bucket-by-python",
retention_rules=retention_rules,
org_id=org.id)
print(created_bucket)

"""
List all Buckets
"""
print(f"\n------- List -------\n")
buckets = buckets_api.find_buckets().buckets
print("\n".join([f" ---\n ID: {bucket.id}\n Name: {bucket.name}\n Retention: {bucket.retention_rules}"
for bucket in buckets]))
print("---")

"""
Delete previously created bucket
"""
print(f"------- Delete -------\n")
buckets_api.delete_bucket(created_bucket)
print(f" successfully deleted bucket: {created_bucket.name}")

client.close()
70 changes: 34 additions & 36 deletions examples/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,37 @@
from influxdb_client import WritePrecision, InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

bucket = "my-bucket"

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3).time(datetime.now(), WritePrecision.MS)

# write using point structure
write_api.write(bucket=bucket, record=p)

line_protocol = p.to_line_protocol()
print(line_protocol)

# write using line protocol string
write_api.write(bucket=bucket, record=line_protocol)

# using Table structure
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -1m)')
for table in tables:
print(table)
for record in table.records:
# process record
print(record.values)

# using csv library
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
val_count = 0
for record in csv_result:
for cell in record:
val_count += 1
print("val count: ", val_count)

response = query_api.query_raw('from(bucket:"my-bucket") |> range(start: -10m)')
print (codecs.decode(response.data))
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) as client:
query_api = client.query_api()

p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3).time(datetime.utcnow(),
WritePrecision.MS)
write_api = client.write_api(write_options=SYNCHRONOUS)

# write using point structure
write_api.write(bucket="my-bucket", record=p)

line_protocol = p.to_line_protocol()
print(line_protocol)

# write using line protocol string
write_api.write(bucket="my-bucket", record=line_protocol)

# using Table structure
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
for table in tables:
print(table)
for record in table.records:
# process record
print(record.values)

# using csv library
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
val_count = 0
for record in csv_result:
for cell in record:
val_count += 1
print("val count: ", val_count)

response = query_api.query_raw('from(bucket:"my-bucket") |> range(start: -10m)')
print (codecs.decode(response.data))
58 changes: 26 additions & 32 deletions examples/import_data_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,39 +60,33 @@ def parse_row(row: OrderedDict):
.from_iterable(DictReader(open('vix-daily.csv', 'r'))) \
.pipe(ops.map(lambda row: parse_row(row)))

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True)
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True) as client:

"""
Create client that writes data in batches with 50_000 items.
"""
write_api = client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000))

"""
Write data into InfluxDB
"""
write_api.write(bucket="my-bucket", record=data)
write_api.close()
"""
Create client that writes data in batches with 50_000 items.
"""
with client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000)) as write_api:

"""
Querying max value of CBOE Volatility Index
"""
query = 'from(bucket:"my-bucket")' \
' |> range(start: 0, stop: now())' \
' |> filter(fn: (r) => r._measurement == "financial-analysis")' \
' |> max()'
result = client.query_api().query(query=query)
"""
Write data into InfluxDB
"""
write_api.write(bucket="my-bucket", record=data)

"""
Processing results
"""
print()
print("=== results ===")
print()
for table in result:
for record in table.records:
print('max {0:5} = {1}'.format(record.get_field(), record.get_value()))
"""
Querying max value of CBOE Volatility Index
"""
query = 'from(bucket:"my-bucket")' \
' |> range(start: 0, stop: now())' \
' |> filter(fn: (r) => r._measurement == "financial-analysis")' \
' |> max()'
result = client.query_api().query(query=query)

"""
Close client
"""
client.close()
"""
Processing results
"""
print()
print("=== results ===")
print()
for table in result:
for record in table.records:
print('max {0:5} = {1}'.format(record.get_field(), record.get_value()))
Loading

0 comments on commit 4792fd5

Please sign in to comment.