Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

Commit

Permalink
Add grafana charts and prometheus metrics to capture db errors (#246)
Browse files Browse the repository at this point in the history
  • Loading branch information
abaiken authored Sep 23, 2019
1 parent a0fe035 commit eb20402
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 15 deletions.
109 changes: 98 additions & 11 deletions scripts/config/yupana-grafana.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,93 @@
"id": 1,
"links": [],
"panels": [
{
"aliasColors": {
"DB errors": "dark-blue"
},
"bars": false,
"dashLength": 10,
"dashes": false,
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 0
},
"id": 26,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "yupana_db_errors_total",
"legendFormat": "DB errors",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Database Errors",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {
"kafka_errors_total{instance=\"docker.for.mac.localhost:8001\",job=\"yupana\"}": "dark-blue"
Expand All @@ -31,7 +118,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 0
"y": 8
},
"id": 24,
"legend": {
Expand Down Expand Up @@ -127,7 +214,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 8
"y": 16
},
"id": 22,
"legend": {
Expand Down Expand Up @@ -266,7 +353,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 16
"y": 24
},
"id": 18,
"legend": {
Expand Down Expand Up @@ -350,7 +437,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 24
"y": 32
},
"id": 16,
"legend": {
Expand Down Expand Up @@ -449,7 +536,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 32
"y": 40
},
"id": 14,
"legend": {
Expand Down Expand Up @@ -559,7 +646,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 40
"y": 48
},
"id": 12,
"legend": {
Expand Down Expand Up @@ -654,7 +741,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 48
"y": 56
},
"id": 8,
"legend": {
Expand Down Expand Up @@ -752,7 +839,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 56
"y": 64
},
"id": 6,
"interval": "",
Expand Down Expand Up @@ -855,7 +942,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 64
"y": 72
},
"id": 4,
"legend": {
Expand Down Expand Up @@ -967,7 +1054,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 72
"y": 80
},
"id": 2,
"legend": {
Expand Down Expand Up @@ -1083,5 +1170,5 @@
"timezone": "",
"title": "Yupana Dev",
"uid": "KYkcp45Wk",
"version": 8
"version": 3
}
10 changes: 8 additions & 2 deletions yupana/processor/abstract_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@

import pytz
from django.db import transaction
from processor.report_consumer import (QPCReportException,
from processor.report_consumer import (DB_ERRORS,
QPCReportException,
format_message)
from prometheus_client import Counter, Gauge, Summary

Expand Down Expand Up @@ -155,6 +156,7 @@ async def run(self):
else:
await asyncio.sleep(NEW_REPORT_QUERY_INTERVAL)

@DB_ERRORS.count_exceptions()
def calculate_queued_objects(self, current_time, status_info):
"""Calculate the number of reports waiting to be processed.
Expand Down Expand Up @@ -190,6 +192,7 @@ def return_queryset_object(queryset):
except (Report.DoesNotExist, ReportSlice.DoesNotExist):
return None

@DB_ERRORS.count_exceptions()
def get_oldest_object_to_retry(self):
"""Grab the oldest report or report slice object to retry.
Expand Down Expand Up @@ -227,6 +230,7 @@ def get_oldest_object_to_retry(self):
# if we haven't returned a retry object, return None
return None

@DB_ERRORS.count_exceptions()
def get_new_record(self):
"""Grab the newest report or report slice object."""
# Get the queryset for all of the objects in the NEW state
Expand Down Expand Up @@ -403,6 +407,7 @@ def update_object_state(self, options): # noqa: C901 (too-complex)
serializer.save()

except Exception as error:
DB_ERRORS.inc()
LOG.error(format_message(
self.prefix,
'Could not update %s record due to the following error %s.' % (
Expand Down Expand Up @@ -523,7 +528,8 @@ def log_time_stats(self, archived_rep):
account_number=self.account_number,
report_platform_id=self.report_platform_id))

@transaction.atomic # noqa: C901 (too-complex)
@DB_ERRORS.count_exceptions() # noqa: C901 (too-complex)
@transaction.atomic
def archive_report_and_slices(self): # pylint: disable=too-many-statements
"""Archive the report slice objects & associated report."""
self.prefix = 'ARCHIVING'
Expand Down
3 changes: 2 additions & 1 deletion yupana/processor/garbage_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from datetime import datetime, timedelta

import pytz
from processor.report_consumer import format_message
from processor.report_consumer import DB_ERRORS, format_message

from api.models import ReportArchive
from config.settings.base import (ARCHIVE_RECORD_RETENTION_PERIOD,
Expand Down Expand Up @@ -61,6 +61,7 @@ async def run(self):
% int(GARBAGE_COLLECTION_INTERVAL)))
await asyncio.sleep(GARBAGE_COLLECTION_INTERVAL)

@DB_ERRORS.count_exceptions()
def remove_outdated_archives(self):
"""Query for archived reports and delete them if they have come of age."""
current_time = datetime.now(pytz.utc)
Expand Down
3 changes: 3 additions & 0 deletions yupana/processor/report_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
'Number of messages uploaded to qpc topic',
['account_number'])


KAFKA_ERRORS = Counter('yupana_kafka_errors', 'Number of Kafka errors')
DB_ERRORS = Counter('yupana_db_errors', 'Number of db errors')


def format_message(prefix, message, account_number=None,
Expand Down Expand Up @@ -156,6 +158,7 @@ async def save_message_and_ack(consumer, consumer_record):
prefix, 'Upload service message saved. Ready for processing.'))
await consumer.commit()
except Exception as error: # pylint: disable=broad-except
DB_ERRORS.inc()
LOG.error(format_message(
prefix,
'The following error occurred while trying to save and '
Expand Down
10 changes: 9 additions & 1 deletion yupana/processor/report_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
from processor.abstract_processor import (
AbstractProcessor,
FAILED_TO_DOWNLOAD, FAILED_TO_VALIDATE, RETRY)
from processor.report_consumer import (KafkaMsgHandlerError,
from processor.report_consumer import (DB_ERRORS,
KAFKA_ERRORS,
KafkaMsgHandlerError,
QPCReportException,
format_message)
from prometheus_client import Counter, Gauge
Expand Down Expand Up @@ -164,6 +166,7 @@ def transition_to_downloaded(self):
account_number=self.account_number))
self.determine_retry(Report.FAILED_DOWNLOAD, Report.STARTED)

@DB_ERRORS.count_exceptions()
def transition_to_validated(self):
"""Validate that the report contents & move to validated state."""
self.prefix = 'ATTEMPTING VALIDATE'
Expand Down Expand Up @@ -237,6 +240,7 @@ async def transition_to_validation_reported(self):
report_platform_id=self.report_platform_id))
self.determine_retry(Report.FAILED_VALIDATION_REPORTING, Report.VALIDATED)

@DB_ERRORS.count_exceptions()
def create_report_slice(self, options):
"""Create report slice.
Expand Down Expand Up @@ -375,6 +379,7 @@ def update_slice_state(self, options, report_slice): # noqa: C901 (too-complex)
account_number=self.account_number,
report_platform_id=self.report_platform_id))
except Exception as error: # pylint: disable=broad-except
DB_ERRORS.inc()
LOG.error(format_message(
self.prefix,
'Could not update report slice record due to the following error %s.' % str(error),
Expand Down Expand Up @@ -663,6 +668,7 @@ def _extract_and_create_slices(self, report_tar_gz): # noqa: C901 (too-complex)
'Unexpected error reading tar.gz: %s' % str(err),
account_number=self.account_number))

@KAFKA_ERRORS.count_exceptions()
async def _send_confirmation(self, file_hash): # pragma: no cover
"""
Send kafka validation message to Insights Upload service.
Expand All @@ -680,6 +686,7 @@ async def _send_confirmation(self, file_hash): # pragma: no cover
try:
await producer.start()
except (KafkaConnectionError, TimeoutError):
KAFKA_ERRORS.inc()
await producer.stop()
raise KafkaMsgHandlerError(
format_message(
Expand All @@ -704,6 +711,7 @@ async def _send_confirmation(self, file_hash): # pragma: no cover
finally:
await producer.stop()

@DB_ERRORS.count_exceptions()
@transaction.atomic
def deduplicate_reports(self):
"""If a report with the same id already exists, archive the new report."""
Expand Down

0 comments on commit eb20402

Please sign in to comment.