diff --git a/scripts/config/yupana-grafana.json b/scripts/config/yupana-grafana.json index 08f259a1..644f6e9a 100644 --- a/scripts/config/yupana-grafana.json +++ b/scripts/config/yupana-grafana.json @@ -18,6 +18,93 @@ "id": 1, "links": [], "panels": [ + { + "aliasColors": { + "DB errors": "dark-blue" + }, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 26, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "yupana_db_errors_total", + "legendFormat": "DB errors", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Database Errors", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "aliasColors": { "kafka_errors_total{instance=\"docker.for.mac.localhost:8001\",job=\"yupana\"}": "dark-blue" @@ -31,7 +118,7 @@ "h": 8, "w": 12, "x": 0, - "y": 0 + "y": 8 }, "id": 24, "legend": { @@ -127,7 +214,7 @@ "h": 8, "w": 12, "x": 0, - "y": 8 + "y": 16 }, "id": 22, "legend": { @@ -266,7 +353,7 @@ "h": 8, "w": 12, "x": 0, - "y": 16 + "y": 24 }, "id": 18, "legend": { @@ -350,7 +437,7 @@ "h": 8, "w": 12, "x": 0, - "y": 24 + "y": 32 }, "id": 16, "legend": { @@ -449,7 +536,7 @@ "h": 8, "w": 12, "x": 0, - "y": 32 + "y": 40 }, "id": 14, "legend": { @@ -559,7 +646,7 @@ "h": 8, "w": 12, "x": 0, - "y": 40 + "y": 48 }, "id": 12, "legend": { @@ -654,7 +741,7 @@ "h": 8, "w": 12, "x": 0, - "y": 48 + "y": 56 }, "id": 8, "legend": { @@ -752,7 +839,7 @@ "h": 8, "w": 12, "x": 0, - "y": 56 + "y": 64 }, "id": 6, "interval": "", @@ -855,7 +942,7 @@ "h": 8, "w": 12, "x": 0, - "y": 64 + "y": 72 }, "id": 4, "legend": { @@ -967,7 +1054,7 @@ "h": 9, "w": 12, "x": 0, - "y": 72 + "y": 80 }, "id": 2, "legend": { @@ -1083,5 +1170,5 @@ "timezone": "", "title": "Yupana Dev", "uid": "KYkcp45Wk", - "version": 8 + "version": 3 } \ No newline at end of file diff --git a/yupana/processor/abstract_processor.py b/yupana/processor/abstract_processor.py index 28c2fb64..1429c24b 100644 --- a/yupana/processor/abstract_processor.py +++ b/yupana/processor/abstract_processor.py @@ -26,7 +26,8 @@ import pytz from django.db import transaction -from processor.report_consumer import (QPCReportException, +from processor.report_consumer import (DB_ERRORS, + QPCReportException, format_message) from prometheus_client import Counter, Gauge, Summary @@ -155,6 +156,7 @@ async def run(self): else: await asyncio.sleep(NEW_REPORT_QUERY_INTERVAL) + @DB_ERRORS.count_exceptions() def calculate_queued_objects(self, current_time, status_info): """Calculate the number of reports waiting to be processed. @@ -190,6 +192,7 @@ def return_queryset_object(queryset): except (Report.DoesNotExist, ReportSlice.DoesNotExist): return None + @DB_ERRORS.count_exceptions() def get_oldest_object_to_retry(self): """Grab the oldest report or report slice object to retry. @@ -227,6 +230,7 @@ def get_oldest_object_to_retry(self): # if we haven't returned a retry object, return None return None + @DB_ERRORS.count_exceptions() def get_new_record(self): """Grab the newest report or report slice object.""" # Get the queryset for all of the objects in the NEW state @@ -403,6 +407,7 @@ def update_object_state(self, options): # noqa: C901 (too-complex) serializer.save() except Exception as error: + DB_ERRORS.inc() LOG.error(format_message( self.prefix, 'Could not update %s record due to the following error %s.' % ( @@ -523,7 +528,8 @@ def log_time_stats(self, archived_rep): account_number=self.account_number, report_platform_id=self.report_platform_id)) - @transaction.atomic # noqa: C901 (too-complex) + @DB_ERRORS.count_exceptions() # noqa: C901 (too-complex) + @transaction.atomic def archive_report_and_slices(self): # pylint: disable=too-many-statements """Archive the report slice objects & associated report.""" self.prefix = 'ARCHIVING' diff --git a/yupana/processor/garbage_collection.py b/yupana/processor/garbage_collection.py index 6c2ded78..bd7badbc 100644 --- a/yupana/processor/garbage_collection.py +++ b/yupana/processor/garbage_collection.py @@ -22,7 +22,7 @@ from datetime import datetime, timedelta import pytz -from processor.report_consumer import format_message +from processor.report_consumer import DB_ERRORS, format_message from api.models import ReportArchive from config.settings.base import (ARCHIVE_RECORD_RETENTION_PERIOD, @@ -61,6 +61,7 @@ async def run(self): % int(GARBAGE_COLLECTION_INTERVAL))) await asyncio.sleep(GARBAGE_COLLECTION_INTERVAL) + @DB_ERRORS.count_exceptions() def remove_outdated_archives(self): """Query for archived reports and delete them if they have come of age.""" current_time = datetime.now(pytz.utc) diff --git a/yupana/processor/report_consumer.py b/yupana/processor/report_consumer.py index 24d0c9a5..c74b2fa8 100644 --- a/yupana/processor/report_consumer.py +++ b/yupana/processor/report_consumer.py @@ -40,7 +40,9 @@ 'Number of messages uploaded to qpc topic', ['account_number']) + KAFKA_ERRORS = Counter('yupana_kafka_errors', 'Number of Kafka errors') +DB_ERRORS = Counter('yupana_db_errors', 'Number of db errors') def format_message(prefix, message, account_number=None, @@ -156,6 +158,7 @@ async def save_message_and_ack(consumer, consumer_record): prefix, 'Upload service message saved. Ready for processing.')) await consumer.commit() except Exception as error: # pylint: disable=broad-except + DB_ERRORS.inc() LOG.error(format_message( prefix, 'The following error occurred while trying to save and ' diff --git a/yupana/processor/report_processor.py b/yupana/processor/report_processor.py index 0392468c..176653f7 100644 --- a/yupana/processor/report_processor.py +++ b/yupana/processor/report_processor.py @@ -33,7 +33,9 @@ from processor.abstract_processor import ( AbstractProcessor, FAILED_TO_DOWNLOAD, FAILED_TO_VALIDATE, RETRY) -from processor.report_consumer import (KafkaMsgHandlerError, +from processor.report_consumer import (DB_ERRORS, + KAFKA_ERRORS, + KafkaMsgHandlerError, QPCReportException, format_message) from prometheus_client import Counter, Gauge @@ -164,6 +166,7 @@ def transition_to_downloaded(self): account_number=self.account_number)) self.determine_retry(Report.FAILED_DOWNLOAD, Report.STARTED) + @DB_ERRORS.count_exceptions() def transition_to_validated(self): """Validate that the report contents & move to validated state.""" self.prefix = 'ATTEMPTING VALIDATE' @@ -237,6 +240,7 @@ async def transition_to_validation_reported(self): report_platform_id=self.report_platform_id)) self.determine_retry(Report.FAILED_VALIDATION_REPORTING, Report.VALIDATED) + @DB_ERRORS.count_exceptions() def create_report_slice(self, options): """Create report slice. @@ -375,6 +379,7 @@ def update_slice_state(self, options, report_slice): # noqa: C901 (too-complex) account_number=self.account_number, report_platform_id=self.report_platform_id)) except Exception as error: # pylint: disable=broad-except + DB_ERRORS.inc() LOG.error(format_message( self.prefix, 'Could not update report slice record due to the following error %s.' % str(error), @@ -663,6 +668,7 @@ def _extract_and_create_slices(self, report_tar_gz): # noqa: C901 (too-complex) 'Unexpected error reading tar.gz: %s' % str(err), account_number=self.account_number)) + @KAFKA_ERRORS.count_exceptions() async def _send_confirmation(self, file_hash): # pragma: no cover """ Send kafka validation message to Insights Upload service. @@ -680,6 +686,7 @@ async def _send_confirmation(self, file_hash): # pragma: no cover try: await producer.start() except (KafkaConnectionError, TimeoutError): + KAFKA_ERRORS.inc() await producer.stop() raise KafkaMsgHandlerError( format_message( @@ -704,6 +711,7 @@ async def _send_confirmation(self, file_hash): # pragma: no cover finally: await producer.stop() + @DB_ERRORS.count_exceptions() @transaction.atomic def deduplicate_reports(self): """If a report with the same id already exists, archive the new report."""