Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

Commit

Permalink
Stop processors when fatal errors occur (#252)
Browse files Browse the repository at this point in the history
* Initial changes for adding a shutdown for all processors when fatal errors occur
  • Loading branch information
abaiken authored Oct 8, 2019
1 parent 81ffd5c commit 7e99788
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 331 deletions.
12 changes: 9 additions & 3 deletions yupana/processor/abstract_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@

import pytz
from django.db import transaction
from processor.processor_utils import (format_message,
stop_all_event_loops)
from processor.report_consumer import (DB_ERRORS,
QPCReportException,
format_message)
QPCReportException)
from prometheus_client import Counter, Gauge, Summary

from api.models import (Report,
Expand Down Expand Up @@ -144,7 +145,10 @@ async def run(self):
"""
while self.should_run:
if not self.report_or_slice:
self.assign_object()
try:
self.assign_object()
except Exception: # pylint:disable=broad-except
stop_all_event_loops()
if self.report_or_slice:
try:
await self.delegate_state()
Expand Down Expand Up @@ -408,11 +412,13 @@ def update_object_state(self, options): # noqa: C901 (too-complex)

except Exception as error:
DB_ERRORS.inc()
self.should_run = False
LOG.error(format_message(
self.prefix,
'Could not update %s record due to the following error %s.' % (
self.object_prefix.lower(), str(error)),
account_number=self.account_number, report_platform_id=self.report_platform_id))
stop_all_event_loops()

def move_candidates_to_failed(self):
"""Before entering a failed state any candidates should be moved to the failed hosts."""
Expand Down
10 changes: 7 additions & 3 deletions yupana/processor/garbage_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
from datetime import datetime, timedelta

import pytz
from processor.report_consumer import DB_ERRORS, format_message
from processor.processor_utils import (GARBAGE_COLLECTION_LOOP,
format_message)
from processor.report_consumer import DB_ERRORS

from api.models import ReportArchive
from config.settings.base import (ARCHIVE_RECORD_RETENTION_PERIOD,
GARBAGE_COLLECTION_INTERVAL)

LOG = logging.getLogger(__name__)
GARBAGE_COLLECTION_LOOP = asyncio.new_event_loop()
# this is how often we want garbage collection to run
# (set to seconds) - default value is 1 week
GARBAGE_COLLECTION_INTERVAL = int(GARBAGE_COLLECTION_INTERVAL)
Expand Down Expand Up @@ -98,7 +99,10 @@ def asyncio_garbage_collection_thread(loop): # pragma: no cover
:returns None
"""
collector = GarbageCollector()
loop.run_until_complete(collector.run())
try:
loop.run_until_complete(collector.run())
except Exception: # pylint: disable=broad-except
pass


def initialize_garbage_collection_loop(): # pragma: no cover
Expand Down
128 changes: 0 additions & 128 deletions yupana/processor/legacy_host_validation_consumer.py

This file was deleted.

83 changes: 83 additions & 0 deletions yupana/processor/processor_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#
# Copyright 2018-2019 Red Hat, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
"""Utilities for all of the processor classes."""
import asyncio
import logging


LOG = logging.getLogger(__name__)
UPLOAD_REPORT_CONSUMER_LOOP = asyncio.get_event_loop()
REPORT_PROCESSING_LOOP = asyncio.new_event_loop()
SLICE_PROCESSING_LOOP = asyncio.new_event_loop()
GARBAGE_COLLECTION_LOOP = asyncio.new_event_loop()
PROCESSOR_INSTANCES = [] # this list holds processor instances that have kafka components


def format_message(prefix, message, account_number=None,
report_platform_id=None):
"""Format log messages in a consistent way.
:param prefix: (str) A meaningful prefix to be displayed in all caps.
:param message: (str) A short message describing the state
:param account_number: (str) The account sending the report.
:param report_platform_id: (str) The qpc report id.
:returns: (str) containing formatted message
"""
if not report_platform_id and not account_number:
actual_message = 'Report %s - %s' % (prefix, message)
elif account_number and not report_platform_id:
actual_message = 'Report(account=%s) %s - %s' % (account_number, prefix, message)
else:
actual_message = 'Report(account=%s, report_platform_id=%s) %s - %s' % (
account_number,
report_platform_id, prefix,
message)

return actual_message


def stop_all_event_loops():
"""Stop all of the event loops."""
prefix = 'STOPPING EVENT LOOPS'
for i in PROCESSOR_INSTANCES:
try:
# the only processor with a consumer is the ReportConsumer
# so we check the class and stop the consumer if we have a
# ReportConsumer instance - otherwise we stop a producer
if i.__class__.__name__ == 'ReportConsumer':
i.consumer.stop()
else:
i.producer.stop()
except Exception as err: # pylint:disable=broad-except
LOG.error(format_message(
prefix, 'The following error occurred: %s' % err))
try:
LOG.error(format_message(
prefix,
'A fatal error occurred. Shutting down all processors: '))
LOG.info(format_message(prefix, 'Shutting down the report consumer.'))
UPLOAD_REPORT_CONSUMER_LOOP.stop()
LOG.info(format_message(prefix, 'Shutting down the report processor.'))
REPORT_PROCESSING_LOOP.stop()
LOG.info(format_message(prefix, 'Shutting down the report slice processor.'))
SLICE_PROCESSING_LOOP.stop()
LOG.info(format_message(prefix, 'Shutting down the garbage collector.'))
GARBAGE_COLLECTION_LOOP.stop()
except Exception as err: # pylint: disable=broad-except
LOG.error(format_message(
prefix,
str(err)))
Loading

0 comments on commit 7e99788

Please sign in to comment.