Skip to content

Commit

Permalink
feat: Add InfluxLoggingHandler to use the InfluxClient in python nati…
Browse files Browse the repository at this point in the history
…ve logging (#405)
  • Loading branch information
strom-und-spiele authored Feb 28, 2022
1 parent 62a0ba1 commit e66fa77
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 21 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features
1. [#412](https://github.com/influxdata/influxdb-client-python/pull/412): `DeleteApi` uses default value from `InfluxDBClient.org` if an `org` parameter is not specified
2. [#405](https://github.com/influxdata/influxdb-client-python/pull/405): Add `InfluxLoggingHandler`. A handler to use the client in native python logging.

### CI
1. [#411](https://github.com/influxdata/influxdb-client-python/pull/411): Use new Codecov uploader for reporting code coverage
Expand Down Expand Up @@ -45,6 +46,7 @@ This release introduces a support for new version of InfluxDB OSS API definition
1. [#408](https://github.com/influxdata/influxdb-client-python/pull/408): Improve error message when the client cannot find organization by name
1. [#407](https://github.com/influxdata/influxdb-client-python/pull/407): Use `pandas.concat()` instead of deprecated `DataFrame.append()` [DataFrame]


## 1.25.0 [2022-01-20]

### Features
Expand Down
3 changes: 2 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB
- [write_api_callbacks.py](write_api_callbacks.py) - How to handle batch events
- [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_)
- [logging_handler.py](logging_handler.py) - How to set up a python native logging handler that writes to InfluxDB

## Queries
- [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV`
Expand All @@ -28,4 +29,4 @@
- [influxdb_18_example.py](influxdb_18_example.py) - How to connect to InfluxDB 1.8
- [nanosecond_precision.py](nanosecond_precision.py) - How to use nanoseconds precision
- [invocable_scripts.py](invocable_scripts.py) - How to use Invocable scripts Cloud API to create custom endpoints that query data


54 changes: 54 additions & 0 deletions examples/logging_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""
Show the usage of influx with python native logging.
This is useful if you
* want to log to influx and a local file.
* want to set up influx logging in a project without specifying it in submodules
"""
import datetime
import logging
import time

from influxdb_client import InfluxLoggingHandler, WritePrecision, Point
from influxdb_client.client.write_api import SYNCHRONOUS

DATA_LOGGER_NAME = '…'


def setup_logger():
"""
Set up data logger with the influx logging handler.
This can happen in your core module.
"""
influx_logging_handler = InfluxLoggingHandler(
url="http://localhost:8086", token="my-token", org="my-org", bucket="my-bucket",
client_args={'timeout': 30_000}, # optional configuration of the client
write_api_args={'write_options': SYNCHRONOUS}) # optional configuration of the write api
influx_logging_handler.setLevel(logging.DEBUG)

data_logger = logging.getLogger(DATA_LOGGER_NAME)
data_logger.setLevel(logging.DEBUG)
data_logger.addHandler(influx_logging_handler)
# feel free to add other handlers here.
# if you find yourself writing filters e.g. to only log points to influx, think about adding a PR :)


def use_logger():
"""Use the logger. This can happen in any submodule."""
# `data_logger` will have the influx_logging_handler attached if setup_logger was called somewhere.
data_logger = logging.getLogger(DATA_LOGGER_NAME)
# write a line yourself
data_logger.debug(f"my-measurement,host=host1 temperature=25.3 {int(time.time() * 1e9)}")
# or make use of the influxdb helpers like Point
data_logger.debug(
Point('my-measurement')
.tag('host', 'host1')
.field('temperature', 25.3)
.time(datetime.datetime.utcnow(), WritePrecision.MS)
)


if __name__ == "__main__":
setup_logger()
use_logger()
1 change: 1 addition & 0 deletions influxdb_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@
from influxdb_client.client.users_api import UsersApi
from influxdb_client.client.write_api import WriteApi, WriteOptions
from influxdb_client.client.influxdb_client import InfluxDBClient
from influxdb_client.client.loggingHandler import InfluxLoggingHandler
from influxdb_client.client.write.point import Point

from influxdb_client.version import CLIENT_VERSION
Expand Down
64 changes: 64 additions & 0 deletions influxdb_client/client/loggingHandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Use the influxdb_client with python native logging."""
import logging

from influxdb_client import InfluxDBClient


class InfluxLoggingHandler(logging.Handler):
"""
InfluxLoggingHandler instances dispatch logging events to influx.
There is no need to set a Formatter.
The raw input will be passed on to the influx write api.
"""

DEFAULT_LOG_RECORD_KEYS = list(logging.makeLogRecord({}).__dict__.keys()) + ['message']

def __init__(self, *, url, token, org, bucket, client_args=None, write_api_args=None):
"""
Initialize defaults.
The arguments `client_args` and `write_api_args` can be dicts of kwargs.
They are passed on to the InfluxDBClient and write_api calls respectively.
"""
super().__init__()

self.bucket = bucket

client_args = {} if client_args is None else client_args
self.client = InfluxDBClient(url=url, token=token, org=org, **client_args)

write_api_args = {} if write_api_args is None else write_api_args
self.write_api = self.client.write_api(**write_api_args)

def __del__(self):
"""Make sure all resources are closed."""
self.close()

def close(self) -> None:
"""Close the write_api, client and logger."""
self.write_api.close()
self.client.close()
super().close()

def emit(self, record: logging.LogRecord) -> None:
"""Emit a record via the influxDB WriteApi."""
try:
message = self.format(record)
extra = self._get_extra_values(record)
return self.write_api.write(record=message, **extra)
except (KeyboardInterrupt, SystemExit):
raise
except (Exception,):
self.handleError(record)

def _get_extra_values(self, record: logging.LogRecord) -> dict:
"""
Extract all items from the record that were injected via extra.
Example: `logging.debug(msg, extra={key: value, ...})`.
"""
extra = {'bucket': self.bucket}
extra.update({key: value for key, value in record.__dict__.items()
if key not in self.DEFAULT_LOG_RECORD_KEYS})
return extra
11 changes: 9 additions & 2 deletions influxdb_client/client/write/point.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Point data structure to represent LineProtocol."""


import math
from builtins import int
from datetime import datetime, timedelta
Expand Down Expand Up @@ -146,7 +145,6 @@ def __init__(self, measurement_name):
self._name = measurement_name
self._time = None
self._write_precision = DEFAULT_WRITE_PRECISION
pass

def time(self, time, write_precision=DEFAULT_WRITE_PRECISION):
"""
Expand Down Expand Up @@ -195,6 +193,15 @@ def write_precision(self):
"""Get precision."""
return self._write_precision

@classmethod
def set_str_rep(cls, rep_function):
"""Set the string representation for all Points."""
cls.__str___rep = rep_function

def __str__(self):
"""Create string representation of this Point."""
return self.to_line_protocol()


def _append_tags(tags):
_return = []
Expand Down
149 changes: 149 additions & 0 deletions tests/test_loggingHandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import logging
import unittest
import unittest.mock

import urllib3

from influxdb_client import InfluxLoggingHandler, InfluxDBClient, WriteApi, WritePrecision, Point


class LoggingBaseTestCase(unittest.TestCase):
fake_line_record = "tag,field=value 123456"
URL_TOKEN_ORG = {
'url': 'http://example.com',
'token': 'my-token',
'org': 'my-org',
}
BUCKET = 'my-bucket'

def setUp(self) -> None:
self.mock_InfluxDBClient = unittest.mock.patch("influxdb_client.client.loggingHandler.InfluxDBClient").start()
self.mock_client = unittest.mock.MagicMock(spec=InfluxDBClient)
self.mock_write_api = unittest.mock.MagicMock(spec=WriteApi)
self.mock_client.write_api.return_value = self.mock_write_api
self.mock_InfluxDBClient.return_value = self.mock_client

def gen_handler_and_logger(self):
self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET)
self.handler.setLevel(logging.DEBUG)

self.logger = logging.getLogger("test-logger")
self.logger.setLevel(logging.DEBUG)

def tearDown(self) -> None:
unittest.mock.patch.stopall()


class TestHandlerCreation(LoggingBaseTestCase):
def test_can_create_handler(self):
self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET)
self.mock_InfluxDBClient.assert_called_once_with(**self.URL_TOKEN_ORG)
self.assertEqual(self.BUCKET, self.handler.bucket)
self.mock_client.write_api.assert_called_once_with()

def test_can_create_handler_with_optional_args_for_client(self):
self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET,
client_args={'arg2': 2.90, 'optArg': 'whot'})
self.mock_InfluxDBClient.assert_called_once_with(**self.URL_TOKEN_ORG, arg2=2.90, optArg="whot")
self.mock_client.write_api.assert_called_once_with()

def test_can_create_handler_with_args_for_write_api(self):
self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET,
client_args={'arg2': 2.90, 'optArg': 'whot'},
write_api_args={'foo': 'bar'})
self.mock_InfluxDBClient.assert_called_once_with(**self.URL_TOKEN_ORG, arg2=2.90, optArg="whot")
self.mock_client.write_api.assert_called_once_with(foo='bar')


class CreatedHandlerTestCase(LoggingBaseTestCase):
def setUp(self) -> None:
super().setUp()
self.gen_handler_and_logger()


class LoggingSetupAndTearDown(CreatedHandlerTestCase):
def test_is_handler(self):
self.assertIsInstance(self.handler, logging.Handler)

def test_set_up_client(self):
self.mock_InfluxDBClient.assert_called_once()

def test_closes_connections_on_close(self):
self.handler.close()
self.mock_write_api.close.assert_called_once()
self.mock_client.close.assert_called_once()

def test_handler_can_be_attached_to_logger(self):
self.logger.addHandler(self.handler)
self.assertTrue(self.logger.hasHandlers())
self.assertTrue(self.handler in self.logger.handlers)


class LoggingWithAttachedHandler(CreatedHandlerTestCase):

def setUp(self) -> None:
super().setUp()
self.logger.addHandler(self.handler)


class LoggingHandlerTest(LoggingWithAttachedHandler):

def test_can_log_str(self):
self.logger.debug(self.fake_line_record)
self.mock_write_api.write.assert_called_once_with(bucket="my-bucket", record=self.fake_line_record)

def test_can_log_points(self):
point = Point("measurement").field("field_name", "field_value").time(333, WritePrecision.NS)
self.logger.debug(point)
self.mock_write_api.write.assert_called_once_with(bucket="my-bucket", record=str(point))

def test_catches_urllib_exceptions(self):
self.mock_write_api.write.side_effect = urllib3.exceptions.HTTPError()
try:
with unittest.mock.patch("logging.sys.stderr") as _:
# Handler writes logging errors to stderr. Don't display it in the test output.
self.logger.debug(self.fake_line_record)
finally:
self.mock_write_api.write.side_effect = None

def test_raises_on_exit(self):
try:
self.mock_write_api.write.side_effect = KeyboardInterrupt()
self.assertRaises(KeyboardInterrupt, self.logger.debug, self.fake_line_record)
self.mock_write_api.write.side_effect = SystemExit()
self.assertRaises(SystemExit, self.logger.debug, self.fake_line_record)
finally:
self.mock_write_api.write.side_effect = None

def test_can_set_bucket(self):
self.handler.bucket = "new-bucket"
self.logger.debug(self.fake_line_record)
self.mock_write_api.write.assert_called_once_with(bucket="new-bucket", record=self.fake_line_record)

def test_can_pass_bucket_on_log(self):
self.logger.debug(self.fake_line_record, extra={'bucket': "other-bucket"})
self.mock_write_api.write.assert_called_once_with(bucket="other-bucket", record=self.fake_line_record)

def test_can_pass_optional_params_on_log(self):
self.logger.debug(self.fake_line_record, extra={'org': "other-org", 'write_precision': WritePrecision.S,
"arg3": 3, "arg2": "two"})
self.mock_write_api.write.assert_called_once_with(bucket="my-bucket", org='other-org',
record=self.fake_line_record,
write_precision=WritePrecision.S,
arg3=3, arg2="two")

def test_formatter(self):
class MyFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
time_ns = int(record.created * 1e9) * 0 + 123
return f"{record.name},level={record.levelname} message=\"{record.msg}\" {time_ns}"

self.handler.setFormatter(MyFormatter())
msg = "a debug message"
self.logger.debug(msg)
expected_record = f"test-logger,level=DEBUG message=\"{msg}\" 123"
self.mock_write_api.write.assert_called_once_with(bucket="my-bucket", record=expected_record)


if __name__ == "__main__":
unittest.main()
Loading

0 comments on commit e66fa77

Please sign in to comment.