Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add InfluxLoggingHandler to use the InfluxClient in python native logging #405

Merged
merged 10 commits into from
Feb 28, 2022
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features
1. [#412](https://github.com/influxdata/influxdb-client-python/pull/412): `DeleteApi` uses default value from `InfluxDBClient.org` if an `org` parameter is not specified
2. [#405](https://github.com/influxdata/influxdb-client-python/pull/405): Add `InfluxLoggingHandler`. A handler to use the client in native python logging.

### CI
1. [#411](https://github.com/influxdata/influxdb-client-python/pull/411): Use new Codecov uploader for reporting code coverage
Expand Down Expand Up @@ -45,6 +46,7 @@ This release introduces a support for new version of InfluxDB OSS API definition
1. [#408](https://github.com/influxdata/influxdb-client-python/pull/408): Improve error message when the client cannot find organization by name
1. [#407](https://github.com/influxdata/influxdb-client-python/pull/407): Use `pandas.concat()` instead of deprecated `DataFrame.append()` [DataFrame]


## 1.25.0 [2022-01-20]

### Features
Expand Down
3 changes: 2 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB
- [write_api_callbacks.py](write_api_callbacks.py) - How to handle batch events
- [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_)
- [logging_handler.py](logging_handler.py) - How to set up a python native logging handler that writes to InfluxDB

## Queries
- [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV`
Expand All @@ -28,4 +29,4 @@
- [influxdb_18_example.py](influxdb_18_example.py) - How to connect to InfluxDB 1.8
- [nanosecond_precision.py](nanosecond_precision.py) - How to use nanoseconds precision
- [invocable_scripts.py](invocable_scripts.py) - How to use Invocable scripts Cloud API to create custom endpoints that query data


54 changes: 54 additions & 0 deletions examples/logging_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""
Show the usage of influx with python native logging.

This is useful if you
* want to log to influx and a local file.
* want to set up influx logging in a project without specifying it in submodules
"""
import datetime
import logging
import time

from influxdb_client import InfluxLoggingHandler, WritePrecision, Point
from influxdb_client.client.write_api import SYNCHRONOUS

DATA_LOGGER_NAME = '…'


def setup_logger():
"""
Set up data logger with the influx logging handler.

This can happen in your core module.
"""
influx_logging_handler = InfluxLoggingHandler(
url="http://localhost:8086", token="my-token", org="my-org", bucket="my-bucket",
client_args={'timeout': 30_000}, # optional configuration of the client
write_api_args={'write_options': SYNCHRONOUS}) # optional configuration of the write api
influx_logging_handler.setLevel(logging.DEBUG)

data_logger = logging.getLogger(DATA_LOGGER_NAME)
data_logger.setLevel(logging.DEBUG)
data_logger.addHandler(influx_logging_handler)
# feel free to add other handlers here.
# if you find yourself writing filters e.g. to only log points to influx, think about adding a PR :)


def use_logger():
"""Use the logger. This can happen in any submodule."""
# `data_logger` will have the influx_logging_handler attached if setup_logger was called somewhere.
data_logger = logging.getLogger(DATA_LOGGER_NAME)
# write a line yourself
data_logger.debug(f"my-measurement,host=host1 temperature=25.3 {int(time.time() * 1e9)}")
# or make use of the influxdb helpers like Point
data_logger.debug(
Point('my-measurement')
.tag('host', 'host1')
.field('temperature', 25.3)
.time(datetime.datetime.utcnow(), WritePrecision.MS)
)


if __name__ == "__main__":
setup_logger()
use_logger()
1 change: 1 addition & 0 deletions influxdb_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@
from influxdb_client.client.users_api import UsersApi
from influxdb_client.client.write_api import WriteApi, WriteOptions
from influxdb_client.client.influxdb_client import InfluxDBClient
from influxdb_client.client.loggingHandler import InfluxLoggingHandler
from influxdb_client.client.write.point import Point

from influxdb_client.version import CLIENT_VERSION
Expand Down
64 changes: 64 additions & 0 deletions influxdb_client/client/loggingHandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Use the influxdb_client with python native logging."""
import logging

from influxdb_client import InfluxDBClient


class InfluxLoggingHandler(logging.Handler):
"""
InfluxLoggingHandler instances dispatch logging events to influx.

There is no need to set a Formatter.
The raw input will be passed on to the influx write api.
"""

DEFAULT_LOG_RECORD_KEYS = list(logging.makeLogRecord({}).__dict__.keys()) + ['message']

def __init__(self, *, url, token, org, bucket, client_args=None, write_api_args=None):
"""
Initialize defaults.

The arguments `client_args` and `write_api_args` can be dicts of kwargs.
They are passed on to the InfluxDBClient and write_api calls respectively.
"""
super().__init__()

self.bucket = bucket

client_args = {} if client_args is None else client_args
self.client = InfluxDBClient(url=url, token=token, org=org, **client_args)

write_api_args = {} if write_api_args is None else write_api_args
self.write_api = self.client.write_api(**write_api_args)

def __del__(self):
"""Make sure all resources are closed."""
self.close()

def close(self) -> None:
"""Close the write_api, client and logger."""
self.write_api.close()
self.client.close()
super().close()

def emit(self, record: logging.LogRecord) -> None:
"""Emit a record via the influxDB WriteApi."""
try:
message = self.format(record)
extra = self._get_extra_values(record)
return self.write_api.write(record=message, **extra)
except (KeyboardInterrupt, SystemExit):
raise
except (Exception,):
self.handleError(record)

def _get_extra_values(self, record: logging.LogRecord) -> dict:
"""
Extract all items from the record that were injected via extra.

Example: `logging.debug(msg, extra={key: value, ...})`.
"""
extra = {'bucket': self.bucket}
extra.update({key: value for key, value in record.__dict__.items()
if key not in self.DEFAULT_LOG_RECORD_KEYS})
return extra
11 changes: 9 additions & 2 deletions influxdb_client/client/write/point.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Point data structure to represent LineProtocol."""


import math
from builtins import int
from datetime import datetime, timedelta
Expand Down Expand Up @@ -146,7 +145,6 @@ def __init__(self, measurement_name):
self._name = measurement_name
self._time = None
self._write_precision = DEFAULT_WRITE_PRECISION
pass

def time(self, time, write_precision=DEFAULT_WRITE_PRECISION):
"""
Expand Down Expand Up @@ -195,6 +193,15 @@ def write_precision(self):
"""Get precision."""
return self._write_precision

@classmethod
def set_str_rep(cls, rep_function):
"""Set the string representation for all Points."""
cls.__str___rep = rep_function

def __str__(self):
"""Create string representation of this Point."""
return self.to_line_protocol()


def _append_tags(tags):
_return = []
Expand Down
149 changes: 149 additions & 0 deletions tests/test_loggingHandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import logging
import unittest
import unittest.mock

import urllib3

from influxdb_client import InfluxLoggingHandler, InfluxDBClient, WriteApi, WritePrecision, Point


class LoggingBaseTestCase(unittest.TestCase):
fake_line_record = "tag,field=value 123456"
URL_TOKEN_ORG = {
'url': 'http://example.com',
'token': 'my-token',
'org': 'my-org',
}
BUCKET = 'my-bucket'

def setUp(self) -> None:
self.mock_InfluxDBClient = unittest.mock.patch("influxdb_client.client.loggingHandler.InfluxDBClient").start()
self.mock_client = unittest.mock.MagicMock(spec=InfluxDBClient)
self.mock_write_api = unittest.mock.MagicMock(spec=WriteApi)
self.mock_client.write_api.return_value = self.mock_write_api
self.mock_InfluxDBClient.return_value = self.mock_client

def gen_handler_and_logger(self):
self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET)
self.handler.setLevel(logging.DEBUG)

self.logger = logging.getLogger("test-logger")
self.logger.setLevel(logging.DEBUG)

def tearDown(self) -> None:
unittest.mock.patch.stopall()


class TestHandlerCreation(LoggingBaseTestCase):
def test_can_create_handler(self):
self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET)
self.mock_InfluxDBClient.assert_called_once_with(**self.URL_TOKEN_ORG)
self.assertEqual(self.BUCKET, self.handler.bucket)
self.mock_client.write_api.assert_called_once_with()

def test_can_create_handler_with_optional_args_for_client(self):
self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET,
client_args={'arg2': 2.90, 'optArg': 'whot'})
self.mock_InfluxDBClient.assert_called_once_with(**self.URL_TOKEN_ORG, arg2=2.90, optArg="whot")
self.mock_client.write_api.assert_called_once_with()

def test_can_create_handler_with_args_for_write_api(self):
self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET,
client_args={'arg2': 2.90, 'optArg': 'whot'},
write_api_args={'foo': 'bar'})
self.mock_InfluxDBClient.assert_called_once_with(**self.URL_TOKEN_ORG, arg2=2.90, optArg="whot")
self.mock_client.write_api.assert_called_once_with(foo='bar')


class CreatedHandlerTestCase(LoggingBaseTestCase):
def setUp(self) -> None:
super().setUp()
self.gen_handler_and_logger()


class LoggingSetupAndTearDown(CreatedHandlerTestCase):
def test_is_handler(self):
self.assertIsInstance(self.handler, logging.Handler)

def test_set_up_client(self):
self.mock_InfluxDBClient.assert_called_once()

def test_closes_connections_on_close(self):
self.handler.close()
self.mock_write_api.close.assert_called_once()
self.mock_client.close.assert_called_once()

def test_handler_can_be_attached_to_logger(self):
self.logger.addHandler(self.handler)
self.assertTrue(self.logger.hasHandlers())
self.assertTrue(self.handler in self.logger.handlers)


class LoggingWithAttachedHandler(CreatedHandlerTestCase):

def setUp(self) -> None:
super().setUp()
self.logger.addHandler(self.handler)


class LoggingHandlerTest(LoggingWithAttachedHandler):

def test_can_log_str(self):
self.logger.debug(self.fake_line_record)
self.mock_write_api.write.assert_called_once_with(bucket="my-bucket", record=self.fake_line_record)

def test_can_log_points(self):
point = Point("measurement").field("field_name", "field_value").time(333, WritePrecision.NS)
self.logger.debug(point)
self.mock_write_api.write.assert_called_once_with(bucket="my-bucket", record=str(point))

def test_catches_urllib_exceptions(self):
self.mock_write_api.write.side_effect = urllib3.exceptions.HTTPError()
try:
with unittest.mock.patch("logging.sys.stderr") as _:
# Handler writes logging errors to stderr. Don't display it in the test output.
self.logger.debug(self.fake_line_record)
finally:
self.mock_write_api.write.side_effect = None

def test_raises_on_exit(self):
try:
self.mock_write_api.write.side_effect = KeyboardInterrupt()
self.assertRaises(KeyboardInterrupt, self.logger.debug, self.fake_line_record)
self.mock_write_api.write.side_effect = SystemExit()
self.assertRaises(SystemExit, self.logger.debug, self.fake_line_record)
finally:
self.mock_write_api.write.side_effect = None

def test_can_set_bucket(self):
self.handler.bucket = "new-bucket"
self.logger.debug(self.fake_line_record)
self.mock_write_api.write.assert_called_once_with(bucket="new-bucket", record=self.fake_line_record)

def test_can_pass_bucket_on_log(self):
self.logger.debug(self.fake_line_record, extra={'bucket': "other-bucket"})
self.mock_write_api.write.assert_called_once_with(bucket="other-bucket", record=self.fake_line_record)

def test_can_pass_optional_params_on_log(self):
self.logger.debug(self.fake_line_record, extra={'org': "other-org", 'write_precision': WritePrecision.S,
"arg3": 3, "arg2": "two"})
self.mock_write_api.write.assert_called_once_with(bucket="my-bucket", org='other-org',
record=self.fake_line_record,
write_precision=WritePrecision.S,
arg3=3, arg2="two")

def test_formatter(self):
class MyFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
time_ns = int(record.created * 1e9) * 0 + 123
return f"{record.name},level={record.levelname} message=\"{record.msg}\" {time_ns}"

self.handler.setFormatter(MyFormatter())
msg = "a debug message"
self.logger.debug(msg)
expected_record = f"test-logger,level=DEBUG message=\"{msg}\" 123"
self.mock_write_api.write.assert_called_once_with(bucket="my-bucket", record=expected_record)


if __name__ == "__main__":
unittest.main()
Loading