Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

Stop processors when fatal errors occur #252

Merged
merged 7 commits into from
Oct 8, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions yupana/processor/abstract_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from django.db import transaction
from processor.report_consumer import (DB_ERRORS,
QPCReportException,
format_message)
format_message,
stop_all_event_loops)
from prometheus_client import Counter, Gauge, Summary

from api.models import (Report,
Expand Down Expand Up @@ -144,7 +145,10 @@ async def run(self):
"""
while self.should_run:
if not self.report_or_slice:
self.assign_object()
try:
self.assign_object()
except Exception: # pylint:disable=broad-except
stop_all_event_loops()
if self.report_or_slice:
try:
await self.delegate_state()
Expand Down Expand Up @@ -408,11 +412,13 @@ def update_object_state(self, options): # noqa: C901 (too-complex)

except Exception as error:
DB_ERRORS.inc()
self.should_run = False
LOG.error(format_message(
self.prefix,
'Could not update %s record due to the following error %s.' % (
self.object_prefix.lower(), str(error)),
account_number=self.account_number, report_platform_id=self.report_platform_id))
stop_all_event_loops()

def move_candidates_to_failed(self):
"""Before entering a failed state any candidates should be moved to the failed hosts."""
Expand Down
10 changes: 7 additions & 3 deletions yupana/processor/garbage_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
from datetime import datetime, timedelta

import pytz
from processor.report_consumer import DB_ERRORS, format_message
from processor.report_consumer import (DB_ERRORS,
GARBAGE_COLLECTION_LOOP,
format_message)

from api.models import ReportArchive
from config.settings.base import (ARCHIVE_RECORD_RETENTION_PERIOD,
GARBAGE_COLLECTION_INTERVAL)

LOG = logging.getLogger(__name__)
GARBAGE_COLLECTION_LOOP = asyncio.new_event_loop()
# this is how often we want garbage collection to run
# (set to seconds) - default value is 1 week
GARBAGE_COLLECTION_INTERVAL = int(GARBAGE_COLLECTION_INTERVAL)
Expand Down Expand Up @@ -98,7 +99,10 @@ def asyncio_garbage_collection_thread(loop): # pragma: no cover
:returns None
"""
collector = GarbageCollector()
loop.run_until_complete(collector.run())
try:
loop.run_until_complete(collector.run())
except Exception: # pylint: disable=broad-except
pass


def initialize_garbage_collection_loop(): # pragma: no cover
Expand Down
128 changes: 0 additions & 128 deletions yupana/processor/legacy_host_validation_consumer.py

This file was deleted.

Loading