-
Notifications
You must be signed in to change notification settings - Fork 187
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
601f66a
commit f099c63
Showing
2 changed files
with
175 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,47 @@ | ||
""" | ||
Use the influxdb_client together with python native logging | ||
""" | ||
import logging | ||
|
||
from influxdb_client import InfluxDBClient | ||
|
||
class InfluxLoggingHandler: | ||
pass | ||
|
||
class InfluxLoggingHandler(logging.Handler): | ||
DEFAULT_LOG_RECORD_KEYS = logging.makeLogRecord({}).__dict__.keys() | ||
|
||
def __init__(self, *, url, token, org, bucket, client_args=None, write_api_args=None): | ||
super().__init__() | ||
|
||
self.bucket = bucket | ||
|
||
client_args = {} if client_args is None else client_args | ||
self.client = InfluxDBClient(url=url, token=token, org=org, **client_args) | ||
|
||
write_api_args = {} if write_api_args is None else write_api_args | ||
self.write_api = self.client.write_api(**write_api_args) | ||
|
||
def __del__(self): | ||
self.close() | ||
|
||
def close(self) -> None: | ||
self.write_api.close() | ||
self.client.close() | ||
super().close() | ||
|
||
def emit(self, record: logging.LogRecord) -> None: | ||
""" Emit a record via the influxDB WriteApi """ | ||
try: | ||
extra = self._get_extra_values(record) | ||
return self.write_api.write(record=record.msg, **extra) | ||
except (KeyboardInterrupt, SystemExit): | ||
raise | ||
except (Exception,): | ||
self.handleError(record) | ||
|
||
def _get_extra_values(self, record: logging.LogRecord) -> dict: | ||
"""extracts all items from the record that were injected by logging.debug(msg, extra={key: value, ...})""" | ||
extra = {key: value | ||
for key, value in record.__dict__.items() if key not in self.DEFAULT_LOG_RECORD_KEYS} | ||
if 'bucket' not in extra.keys(): | ||
extra['bucket'] = self.bucket | ||
return extra |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,137 @@ | ||
import logging | ||
import unittest | ||
import unittest.mock | ||
|
||
from influxdb_client import InfluxLoggingHandler | ||
import urllib3 | ||
|
||
from influxdb_client import InfluxLoggingHandler, InfluxDBClient, WriteApi, WritePrecision, Point | ||
|
||
class LoggingHandlerTest(unittest.TestCase): | ||
def test_imports(self): | ||
handler = InfluxLoggingHandler() | ||
self.assertIsNotNone(handler) | ||
|
||
class LoggingBaseTestCase(unittest.TestCase): | ||
fake_line_record = "tag,field=value 123456" | ||
URL_TOKEN_ORG = { | ||
'url': 'http://example.com', | ||
'token': 'my-token', | ||
'org': 'my-org', | ||
} | ||
BUCKET = 'my-bucket' | ||
|
||
def setUp(self) -> None: | ||
self.mock_InfluxDBClient = unittest.mock.patch("influxdb_client.client.loggingHandler.InfluxDBClient").start() | ||
self.mock_client = unittest.mock.MagicMock(spec=InfluxDBClient) | ||
self.mock_write_api = unittest.mock.MagicMock(spec=WriteApi) | ||
self.mock_client.write_api.return_value = self.mock_write_api | ||
self.mock_InfluxDBClient.return_value = self.mock_client | ||
|
||
def gen_handler_and_logger(self): | ||
self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET) | ||
self.handler.setLevel(logging.DEBUG) | ||
|
||
self.logger = logging.getLogger("test-logger") | ||
self.logger.setLevel(logging.DEBUG) | ||
|
||
def tearDown(self) -> None: | ||
unittest.mock.patch.stopall() | ||
|
||
|
||
class TestHandlerCreation(LoggingBaseTestCase): | ||
def test_can_create_handler(self): | ||
self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET) | ||
self.mock_InfluxDBClient.assert_called_once_with(**self.URL_TOKEN_ORG) | ||
self.assertEqual(self.BUCKET, self.handler.bucket) | ||
self.mock_client.write_api.assert_called_once_with() | ||
|
||
def test_can_create_handler_with_optional_args_for_client(self): | ||
self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET, | ||
client_args={'arg2': 2.90, 'optArg': 'whot'}) | ||
self.mock_InfluxDBClient.assert_called_once_with(**self.URL_TOKEN_ORG, arg2=2.90, optArg="whot") | ||
self.mock_client.write_api.assert_called_once_with() | ||
|
||
def test_can_create_handler_with_args_for_write_api(self): | ||
self.handler = InfluxLoggingHandler(**self.URL_TOKEN_ORG, bucket=self.BUCKET, | ||
client_args={'arg2': 2.90, 'optArg': 'whot'}, | ||
write_api_args={'foo': 'bar'}) | ||
self.mock_InfluxDBClient.assert_called_once_with(**self.URL_TOKEN_ORG, arg2=2.90, optArg="whot") | ||
self.mock_client.write_api.assert_called_once_with(foo='bar') | ||
|
||
|
||
class CreatedHandlerTestCase(LoggingBaseTestCase): | ||
def setUp(self) -> None: | ||
super().setUp() | ||
self.gen_handler_and_logger() | ||
|
||
|
||
class LoggingSetupAndTearDown(CreatedHandlerTestCase): | ||
def test_is_handler(self): | ||
self.assertIsInstance(self.handler, logging.Handler) | ||
|
||
def test_set_up_client(self): | ||
self.mock_InfluxDBClient.assert_called_once() | ||
|
||
def test_closes_connections_on_close(self): | ||
self.handler.close() | ||
self.mock_write_api.close.assert_called_once() | ||
self.mock_client.close.assert_called_once() | ||
|
||
def test_handler_can_be_attached_to_logger(self): | ||
self.logger.addHandler(self.handler) | ||
self.assertTrue(self.logger.hasHandlers()) | ||
self.assertTrue(self.handler in self.logger.handlers) | ||
|
||
|
||
class LoggingWithAttachedHandler(CreatedHandlerTestCase): | ||
|
||
def setUp(self) -> None: | ||
super().setUp() | ||
self.logger.addHandler(self.handler) | ||
|
||
|
||
class LoggingHandlerTest(LoggingWithAttachedHandler): | ||
|
||
def test_can_log_str(self): | ||
self.logger.debug(self.fake_line_record) | ||
self.mock_write_api.write.assert_called_once_with(bucket="my-bucket", record=self.fake_line_record) | ||
|
||
def test_can_log_points(self): | ||
point = Point("measurement").field("field_name", "field_value").time(333, WritePrecision.NS) | ||
self.logger.debug(point) | ||
self.mock_write_api.write.assert_called_once_with(bucket="my-bucket", record=point) | ||
|
||
def test_catches_urllib_exceptions(self): | ||
self.mock_write_api.write.side_effect = urllib3.exceptions.HTTPError() | ||
try: | ||
with unittest.mock.patch("logging.sys.stderr") as _: | ||
# Handler writes logging errors to stderr. Don't display it in the test output. | ||
self.logger.debug(self.fake_line_record) | ||
finally: | ||
self.mock_write_api.write.side_effect = None | ||
|
||
def test_raises_on_exit(self): | ||
try: | ||
self.mock_write_api.write.side_effect = KeyboardInterrupt() | ||
self.assertRaises(KeyboardInterrupt, self.logger.debug, self.fake_line_record) | ||
self.mock_write_api.write.side_effect = SystemExit() | ||
self.assertRaises(SystemExit, self.logger.debug, self.fake_line_record) | ||
finally: | ||
self.mock_write_api.write.side_effect = None | ||
|
||
def test_can_set_bucket(self): | ||
self.handler.bucket = "new-bucket" | ||
self.logger.debug(self.fake_line_record) | ||
self.mock_write_api.write.assert_called_once_with(bucket="new-bucket", record=self.fake_line_record) | ||
|
||
def test_can_pass_bucket_on_log(self): | ||
self.logger.debug(self.fake_line_record, extra={'bucket': "other-bucket"}) | ||
self.mock_write_api.write.assert_called_once_with(bucket="other-bucket", record=self.fake_line_record) | ||
|
||
def test_can_pass_optional_params_on_log(self): | ||
self.logger.debug(self.fake_line_record, extra={'org': "other-org", 'write_precision': WritePrecision.S, | ||
"arg3": 3, "arg2": "two"}) | ||
self.mock_write_api.write.assert_called_once_with(bucket="my-bucket", org='other-org', | ||
record=self.fake_line_record, | ||
write_precision=WritePrecision.S, | ||
arg3=3, arg2="two") | ||
|
||
|
||
if __name__ == "__main__": | ||
unittest.main() |