diff --git a/CHANGELOG.md b/CHANGELOG.md index e0c9a954..810701bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Features 1. [#412](https://github.com/influxdata/influxdb-client-python/pull/412): `DeleteApi` uses default value from `InfluxDBClient.org` if an `org` parameter is not specified +2. [#405](https://github.com/influxdata/influxdb-client-python/pull/405): Add `InfluxLoggingHandler`. A handler to use the client in native python logging. ### CI 1. [#411](https://github.com/influxdata/influxdb-client-python/pull/411): Use new Codecov uploader for reporting code coverage @@ -45,6 +46,7 @@ This release introduces a support for new version of InfluxDB OSS API definition 1. [#408](https://github.com/influxdata/influxdb-client-python/pull/408): Improve error message when the client cannot find organization by name 1. [#407](https://github.com/influxdata/influxdb-client-python/pull/407): Use `pandas.concat()` instead of deprecated `DataFrame.append()` [DataFrame] + ## 1.25.0 [2022-01-20] ### Features diff --git a/examples/README.md b/examples/README.md index 5ea32ec5..2f262e9a 100644 --- a/examples/README.md +++ b/examples/README.md @@ -9,6 +9,7 @@ - [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB - [write_api_callbacks.py](write_api_callbacks.py) - How to handle batch events - [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_) +- [logging_handler.py](logging_handler.py) - How to set up a python native logging handler that writes to InfluxDB ## Queries - [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV` @@ -28,4 +29,4 @@ - [influxdb_18_example.py](influxdb_18_example.py) - How to connect to InfluxDB 1.8 - [nanosecond_precision.py](nanosecond_precision.py) - How to use nanoseconds precision - [invocable_scripts.py](invocable_scripts.py) - How to use Invocable scripts Cloud API to create custom endpoints that query data - \ No newline at end of file + diff --git a/examples/logging_handler.py b/examples/logging_handler.py new file mode 100644 index 00000000..08f2ae05 --- /dev/null +++ b/examples/logging_handler.py @@ -0,0 +1,54 @@ +""" +Show the usage of influx with python native logging. + +This is useful if you +* want to log to influx and a local file. +* want to set up influx logging in a project without specifying it in submodules +""" +import datetime +import logging +import time + +from influxdb_client import InfluxLoggingHandler, WritePrecision, Point +from influxdb_client.client.write_api import SYNCHRONOUS + +DATA_LOGGER_NAME = '…' + + +def setup_logger(): + """ + Set up data logger with the influx logging handler. + + This can happen in your core module. + """ + influx_logging_handler = InfluxLoggingHandler( + url="http://localhost:8086", token="my-token", org="my-org", bucket="my-bucket", + client_args={'timeout': 30_000}, # optional configuration of the client + write_api_args={'write_options': SYNCHRONOUS}) # optional configuration of the write api + influx_logging_handler.setLevel(logging.DEBUG) + + data_logger = logging.getLogger(DATA_LOGGER_NAME) + data_logger.setLevel(logging.DEBUG) + data_logger.addHandler(influx_logging_handler) + # feel free to add other handlers here. + # if you find yourself writing filters e.g. to only log points to influx, think about adding a PR :) + + +def use_logger(): + """Use the logger. This can happen in any submodule.""" + # `data_logger` will have the influx_logging_handler attached if setup_logger was called somewhere. + data_logger = logging.getLogger(DATA_LOGGER_NAME) + # write a line yourself + data_logger.debug(f"my-measurement,host=host1 temperature=25.3 {int(time.time() * 1e9)}") + # or make use of the influxdb helpers like Point + data_logger.debug( + Point('my-measurement') + .tag('host', 'host1') + .field('temperature', 25.3) + .time(datetime.datetime.utcnow(), WritePrecision.MS) + ) + + +if __name__ == "__main__": + setup_logger() + use_logger() diff --git a/influxdb_client/__init__.py b/influxdb_client/__init__.py index 90f2715b..7ce0aca8 100644 --- a/influxdb_client/__init__.py +++ b/influxdb_client/__init__.py @@ -383,6 +383,7 @@ from influxdb_client.client.users_api import UsersApi from influxdb_client.client.write_api import WriteApi, WriteOptions from influxdb_client.client.influxdb_client import InfluxDBClient +from influxdb_client.client.loggingHandler import InfluxLoggingHandler from influxdb_client.client.write.point import Point from influxdb_client.version import CLIENT_VERSION diff --git a/influxdb_client/client/loggingHandler.py b/influxdb_client/client/loggingHandler.py new file mode 100644 index 00000000..445a828d --- /dev/null +++ b/influxdb_client/client/loggingHandler.py @@ -0,0 +1,64 @@ +"""Use the influxdb_client with python native logging.""" +import logging + +from influxdb_client import InfluxDBClient + + +class InfluxLoggingHandler(logging.Handler): + """ + InfluxLoggingHandler instances dispatch logging events to influx. + + There is no need to set a Formatter. + The raw input will be passed on to the influx write api. + """ + + DEFAULT_LOG_RECORD_KEYS = list(logging.makeLogRecord({}).__dict__.keys()) + ['message'] + + def __init__(self, *, url, token, org, bucket, client_args=None, write_api_args=None): + """ + Initialize defaults. + + The arguments `client_args` and `write_api_args` can be dicts of kwargs. + They are passed on to the InfluxDBClient and write_api calls respectively. + """ + super().__init__() + + self.bucket = bucket + + client_args = {} if client_args is None else client_args + self.client = InfluxDBClient(url=url, token=token, org=org, **client_args) + + write_api_args = {} if write_api_args is None else write_api_args + self.write_api = self.client.write_api(**write_api_args) + + def __del__(self): + """Make sure all resources are closed.""" + self.close() + + def close(self) -> None: + """Close the write_api, client and logger.""" + self.write_api.close() + self.client.close() + super().close() + + def emit(self, record: logging.LogRecord) -> None: + """Emit a record via the influxDB WriteApi.""" + try: + message = self.format(record) + extra = self._get_extra_values(record) + return self.write_api.write(record=message, **extra) + except (KeyboardInterrupt, SystemExit): + raise + except (Exception,): + self.handleError(record) + + def _get_extra_values(self, record: logging.LogRecord) -> dict: + """ + Extract all items from the record that were injected via extra. + + Example: `logging.debug(msg, extra={key: value, ...})`. + """ + extra = {'bucket': self.bucket} + extra.update({key: value for key, value in record.__dict__.items() + if key not in self.DEFAULT_LOG_RECORD_KEYS}) + return extra diff --git a/influxdb_client/client/write/point.py b/influxdb_client/client/write/point.py index c9fcbfbc..96d2261e 100644 --- a/influxdb_client/client/write/point.py +++ b/influxdb_client/client/write/point.py @@ -1,6 +1,5 @@ """Point data structure to represent LineProtocol.""" - import math from builtins import int from datetime import datetime, timedelta @@ -146,7 +145,6 @@ def __init__(self, measurement_name): self._name = measurement_name self._time = None self._write_precision = DEFAULT_WRITE_PRECISION - pass def time(self, time, write_precision=DEFAULT_WRITE_PRECISION): """ @@ -195,6 +193,15 @@ def write_precision(self): """Get precision.""" return self._write_precision + @classmethod + def set_str_rep(cls, rep_function): + """Set the string representation for all Points.""" + cls.__str___rep = rep_function + + def __str__(self): + """Create string representation of this Point.""" + return self.to_line_protocol() + def _append_tags(tags): _return = [] diff --git a/tests/test_loggingHandler.py b/tests/test_loggingHandler.py new file mode 100644 index 00000000..238e2031 --- /dev/null +++ b/tests/test_loggingHandler.py @@ -0,0 +1,149 @@ +import logging +import unittest +import unittest.mock + +import urllib3 + +from influxdb_client import InfluxLoggingHandler, InfluxDBClient, WriteApi, WritePrecision, Point + + +class LoggingBaseTestCase(unittest.TestCase): + fake_line_record = "tag,field=value 123456" + URL_TOKEN_ORG = { + 'url': 'http://example.com', + 'token': 'my-token', + 'org': 'my-org', + } + BUCKET = 'my-bucket' + + def setUp(self) -> None: + self.mock_InfluxDBClient = unittest.mock.patch("influxdb_client.client.loggingHandler.InfluxDBClient").start() + self.mock_client = unittest.mock.MagicMock(spec=InfluxDBClient) + self.mock_write_api = unittest.mock.MagicMock(spec=WriteApi) + self.mock_client.write_api.return_value = self.mock_write_api + self.mock_InfluxDBClient.return_value = self.mock_client + + def gen_handler_and_logger(self): + self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET) + self.handler.setLevel(logging.DEBUG) + + self.logger = logging.getLogger("test-logger") + self.logger.setLevel(logging.DEBUG) + + def tearDown(self) -> None: + unittest.mock.patch.stopall() + + +class TestHandlerCreation(LoggingBaseTestCase): + def test_can_create_handler(self): + self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET) + self.mock_InfluxDBClient.assert_called_once_with(**self.URL_TOKEN_ORG) + self.assertEqual(self.BUCKET, self.handler.bucket) + self.mock_client.write_api.assert_called_once_with() + + def test_can_create_handler_with_optional_args_for_client(self): + self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET, + client_args={'arg2': 2.90, 'optArg': 'whot'}) + self.mock_InfluxDBClient.assert_called_once_with(**self.URL_TOKEN_ORG, arg2=2.90, optArg="whot") + self.mock_client.write_api.assert_called_once_with() + + def test_can_create_handler_with_args_for_write_api(self): + self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET, + client_args={'arg2': 2.90, 'optArg': 'whot'}, + write_api_args={'foo': 'bar'}) + self.mock_InfluxDBClient.assert_called_once_with(**self.URL_TOKEN_ORG, arg2=2.90, optArg="whot") + self.mock_client.write_api.assert_called_once_with(foo='bar') + + +class CreatedHandlerTestCase(LoggingBaseTestCase): + def setUp(self) -> None: + super().setUp() + self.gen_handler_and_logger() + + +class LoggingSetupAndTearDown(CreatedHandlerTestCase): + def test_is_handler(self): + self.assertIsInstance(self.handler, logging.Handler) + + def test_set_up_client(self): + self.mock_InfluxDBClient.assert_called_once() + + def test_closes_connections_on_close(self): + self.handler.close() + self.mock_write_api.close.assert_called_once() + self.mock_client.close.assert_called_once() + + def test_handler_can_be_attached_to_logger(self): + self.logger.addHandler(self.handler) + self.assertTrue(self.logger.hasHandlers()) + self.assertTrue(self.handler in self.logger.handlers) + + +class LoggingWithAttachedHandler(CreatedHandlerTestCase): + + def setUp(self) -> None: + super().setUp() + self.logger.addHandler(self.handler) + + +class LoggingHandlerTest(LoggingWithAttachedHandler): + + def test_can_log_str(self): + self.logger.debug(self.fake_line_record) + self.mock_write_api.write.assert_called_once_with(bucket="my-bucket", record=self.fake_line_record) + + def test_can_log_points(self): + point = Point("measurement").field("field_name", "field_value").time(333, WritePrecision.NS) + self.logger.debug(point) + self.mock_write_api.write.assert_called_once_with(bucket="my-bucket", record=str(point)) + + def test_catches_urllib_exceptions(self): + self.mock_write_api.write.side_effect = urllib3.exceptions.HTTPError() + try: + with unittest.mock.patch("logging.sys.stderr") as _: + # Handler writes logging errors to stderr. Don't display it in the test output. + self.logger.debug(self.fake_line_record) + finally: + self.mock_write_api.write.side_effect = None + + def test_raises_on_exit(self): + try: + self.mock_write_api.write.side_effect = KeyboardInterrupt() + self.assertRaises(KeyboardInterrupt, self.logger.debug, self.fake_line_record) + self.mock_write_api.write.side_effect = SystemExit() + self.assertRaises(SystemExit, self.logger.debug, self.fake_line_record) + finally: + self.mock_write_api.write.side_effect = None + + def test_can_set_bucket(self): + self.handler.bucket = "new-bucket" + self.logger.debug(self.fake_line_record) + self.mock_write_api.write.assert_called_once_with(bucket="new-bucket", record=self.fake_line_record) + + def test_can_pass_bucket_on_log(self): + self.logger.debug(self.fake_line_record, extra={'bucket': "other-bucket"}) + self.mock_write_api.write.assert_called_once_with(bucket="other-bucket", record=self.fake_line_record) + + def test_can_pass_optional_params_on_log(self): + self.logger.debug(self.fake_line_record, extra={'org': "other-org", 'write_precision': WritePrecision.S, + "arg3": 3, "arg2": "two"}) + self.mock_write_api.write.assert_called_once_with(bucket="my-bucket", org='other-org', + record=self.fake_line_record, + write_precision=WritePrecision.S, + arg3=3, arg2="two") + + def test_formatter(self): + class MyFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + time_ns = int(record.created * 1e9) * 0 + 123 + return f"{record.name},level={record.levelname} message=\"{record.msg}\" {time_ns}" + + self.handler.setFormatter(MyFormatter()) + msg = "a debug message" + self.logger.debug(msg) + expected_record = f"test-logger,level=DEBUG message=\"{msg}\" 123" + self.mock_write_api.write.assert_called_once_with(bucket="my-bucket", record=expected_record) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_point.py b/tests/test_point.py index 53fd0cc5..00528c7a 100644 --- a/tests/test_point.py +++ b/tests/test_point.py @@ -11,6 +11,11 @@ class PointTest(unittest.TestCase): + def test_ToStr(self): + point = Point.measurement("h2o").tag("location", "europe").field("level", 2.2) + expected_str = point.to_line_protocol() + self.assertEqual(expected_str, str(point)) + def test_MeasurementEscape(self): point = Point.measurement("h2 o").tag("location", "europe").tag("", "warn").field("level", 2) self.assertEqual(point.to_line_protocol(), "h2\\ o,location=europe level=2i") @@ -36,17 +41,17 @@ def test_TagEmptyValue(self): self.assertEqual("h2o,location=europe level=2i", point.to_line_protocol()) def test_TagEscapingKeyAndValue(self): - point = Point.measurement("h\n2\ro\t_data") \ .tag("new\nline", "new\nline") \ .tag("carriage\rreturn", "carriage\nreturn") \ .tag("t\tab", "t\tab") \ .field("level", 2) - self.assertEqual("h\\n2\\ro\\t_data,carriage\\rreturn=carriage\\nreturn,new\\nline=new\\nline,t\\tab=t\\tab level=2i", point.to_line_protocol()) + self.assertEqual( + "h\\n2\\ro\\t_data,carriage\\rreturn=carriage\\nreturn,new\\nline=new\\nline,t\\tab=t\\tab level=2i", + point.to_line_protocol()) def test_EqualSignEscaping(self): - point = Point.measurement("h=2o") \ .tag("l=ocation", "e=urope") \ .field("l=evel", 2) @@ -391,22 +396,24 @@ def test_backslash(self): def test_numpy_types(self): from influxdb_client.extras import np - point = Point.measurement("h2o")\ - .tag("location", "europe")\ - .field("np.float1", np.float(1.123))\ - .field("np.float2", np.float16(2.123))\ - .field("np.float3", np.float32(3.123))\ - .field("np.float4", np.float64(4.123))\ - .field("np.int1", np.int8(1))\ - .field("np.int2", np.int16(2))\ - .field("np.int3", np.int32(3))\ - .field("np.int4", np.int64(4))\ - .field("np.uint1", np.uint8(5))\ - .field("np.uint2", np.uint16(6))\ - .field("np.uint3", np.uint32(7))\ + point = Point.measurement("h2o") \ + .tag("location", "europe") \ + .field("np.float1", np.float(1.123)) \ + .field("np.float2", np.float16(2.123)) \ + .field("np.float3", np.float32(3.123)) \ + .field("np.float4", np.float64(4.123)) \ + .field("np.int1", np.int8(1)) \ + .field("np.int2", np.int16(2)) \ + .field("np.int3", np.int32(3)) \ + .field("np.int4", np.int64(4)) \ + .field("np.uint1", np.uint8(5)) \ + .field("np.uint2", np.uint16(6)) \ + .field("np.uint3", np.uint32(7)) \ .field("np.uint4", np.uint64(8)) - self.assertEqual("h2o,location=europe np.float1=1.123,np.float2=2.123,np.float3=3.123,np.float4=4.123,np.int1=1i,np.int2=2i,np.int3=3i,np.int4=4i,np.uint1=5i,np.uint2=6i,np.uint3=7i,np.uint4=8i", point.to_line_protocol()) + self.assertEqual( + "h2o,location=europe np.float1=1.123,np.float2=2.123,np.float3=3.123,np.float4=4.123,np.int1=1i,np.int2=2i,np.int3=3i,np.int4=4i,np.uint1=5i,np.uint2=6i,np.uint3=7i,np.uint4=8i", + point.to_line_protocol()) def test_from_dictionary_custom_measurement(self): dictionary = { @@ -457,7 +464,9 @@ def test_from_dictionary_custom_fields(self): record_measurement_key="name", record_tag_keys=["location", "version"], record_field_keys=["pressure", "temperature"]) - self.assertEqual("sensor_pt859,location=warehouse_125,version=2021.06.05.5874 pressure=125i,temperature=10i 1632208639", point.to_line_protocol()) + self.assertEqual( + "sensor_pt859,location=warehouse_125,version=2021.06.05.5874 pressure=125i,temperature=10i 1632208639", + point.to_line_protocol()) def test_from_dictionary_tolerant_to_missing_tags_and_fields(self): dictionary = {