diff --git a/delfin/api/schemas/storage_capabilities_schema.py b/delfin/api/schemas/storage_capabilities_schema.py index cd8ebac60..7c7f68e9f 100644 --- a/delfin/api/schemas/storage_capabilities_schema.py +++ b/delfin/api/schemas/storage_capabilities_schema.py @@ -16,6 +16,7 @@ 'type': 'object', 'properties': { 'is_historic': {'type': 'boolean'}, + 'performance_metric_retention_window': {'type': 'integer'}, 'resource_metrics': { 'type': 'object', 'properties': { diff --git a/delfin/common/config.py b/delfin/common/config.py index 6c8bad8e8..eea821773 100644 --- a/delfin/common/config.py +++ b/delfin/common/config.py @@ -119,6 +119,11 @@ .DEF_PERFORMANCE_HISTORY_ON_RESCHEDULE, help='default history(in sec) to be collected on a job ' 'reschedule'), + cfg.IntOpt('max_failed_task_retry_window', + default=constants.TelemetryCollection + .MAX_FAILED_TASK_RETRY_WINDOW, + help='Maximum time window (in sec) until which delfin supports ' + 'collection for failed tasks'), cfg.BoolOpt('enable_dynamic_subprocess', default=False, help='Enable dynamic subprocess metrics collection'), diff --git a/delfin/common/constants.py b/delfin/common/constants.py index 31a93bdfc..aff6e6e81 100644 --- a/delfin/common/constants.py +++ b/delfin/common/constants.py @@ -404,6 +404,8 @@ class TelemetryCollection(object): """Default performance collection interval""" DEF_PERFORMANCE_COLLECTION_INTERVAL = 900 DEF_PERFORMANCE_HISTORY_ON_RESCHEDULE = 1800 + """Maximum failed task retry window in seconds""" + MAX_FAILED_TASK_RETRY_WINDOW = 7200 class TelemetryTaskStatus(object): diff --git a/delfin/drivers/fake_storage/__init__.py b/delfin/drivers/fake_storage/__init__.py index 40ff47601..af00de25c 100644 --- a/delfin/drivers/fake_storage/__init__.py +++ b/delfin/drivers/fake_storage/__init__.py @@ -564,6 +564,7 @@ def get_capabilities(context, filters=None): """Get capability of supported driver""" return { 'is_historic': False, + 'performance_metric_retention_window': 4500, 'resource_metrics': { "storage": { "throughput": { diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py b/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py index 255fb1e29..39b7fc40a 100644 --- a/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py +++ b/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py @@ -15,20 +15,23 @@ from datetime import datetime import six -from delfin.task_manager.scheduler.schedulers.telemetry. \ - failed_performance_collection_handler import \ - FailedPerformanceCollectionHandler +from oslo_config import cfg from oslo_log import log from delfin import db from delfin import exception from delfin.common.constants import TelemetryCollection from delfin.db.sqlalchemy.models import FailedTask +from delfin.drivers import api as driverapi from delfin.task_manager import metrics_rpcapi as metrics_task_rpcapi from delfin.task_manager.scheduler import schedule_manager +from delfin.task_manager.scheduler.schedulers.telemetry. \ + failed_performance_collection_handler import \ + FailedPerformanceCollectionHandler from delfin.task_manager.tasks.telemetry import PerformanceCollectionTask LOG = log.getLogger(__name__) +CONF = cfg.CONF class PerformanceCollectionHandler(object): @@ -39,6 +42,7 @@ def __init__(self, ctx, task_id, storage_id, args, interval, executor): self.args = args self.interval = interval self.metric_task_rpcapi = metrics_task_rpcapi.TaskAPI() + self.driver_api = driverapi.API() self.executor = executor self.scheduler = schedule_manager.SchedulerManager().get_scheduler() @@ -96,10 +100,32 @@ def __call__(self): .format(self.storage_id, self.task_id, self.interval)) def _handle_task_failure(self, start_time, end_time): + failed_task_interval = TelemetryCollection.FAILED_JOB_SCHEDULE_INTERVAL + + try: + # Fetch driver's capability for performance metric retention window + # If driver supports it and if it is within collection range, + # consider it for failed task scheduling + capabilities = self.driver_api.get_capabilities(self.ctx, + self.storage_id) + performance_metric_retention_window \ + = capabilities.get('performance_metric_retention_window') + + if performance_metric_retention_window: + collection_window = performance_metric_retention_window \ + if performance_metric_retention_window <= CONF.telemetry \ + .max_failed_task_retry_window \ + else CONF.telemetry.max_failed_task_retry_window + failed_task_interval = collection_window / TelemetryCollection\ + .MAX_FAILED_JOB_RETRY_COUNT + except Exception as e: + LOG.error("Failed to get driver capabilities during failed task " + "scheduling for storage id :{0}, reason:{1}" + .format(self.storage_id, six.text_type(e))) + failed_task = {FailedTask.storage_id.name: self.storage_id, FailedTask.task_id.name: self.task_id, - FailedTask.interval.name: - TelemetryCollection.FAILED_JOB_SCHEDULE_INTERVAL, + FailedTask.interval.name: failed_task_interval, FailedTask.end_time.name: end_time, FailedTask.start_time.name: start_time, FailedTask.method.name: diff --git a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py index 70ee576e2..d07bcc7c5 100644 --- a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py +++ b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py @@ -82,10 +82,14 @@ def test_performance_collection_success(self, mock_collect_telemetry, @mock.patch('delfin.db.failed_task_create') @mock.patch('delfin.task_manager.tasks.telemetry' '.PerformanceCollectionTask.collect') - def test_performance_collection_failure(self, mock_collect_telemetry, + @mock.patch('delfin.drivers.api.API.get_capabilities') + def test_performance_collection_failure(self, mock_get_capabilities, + mock_collect_telemetry, mock_failed_task_create, mock_assign_failed_job, mock_task_update): + + mock_get_capabilities.return_value = {} mock_collect_telemetry.return_value = TelemetryTaskStatus. \ TASK_EXEC_STATUS_FAILURE ctx = context.get_admin_context()