Skip to content

Commit

Permalink
Failed task schdeuling based on driver capability (#763)
Browse files Browse the repository at this point in the history
Co-authored-by: ThisIsClark <liuyuchibubao@gmail.com>
  • Loading branch information
sushanthakumar and ThisIsClark authored Oct 19, 2021
1 parent b8de7c6 commit 116a391
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 6 deletions.
1 change: 1 addition & 0 deletions delfin/api/schemas/storage_capabilities_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
'type': 'object',
'properties': {
'is_historic': {'type': 'boolean'},
'performance_metric_retention_window': {'type': 'integer'},
'resource_metrics': {
'type': 'object',
'properties': {
Expand Down
5 changes: 5 additions & 0 deletions delfin/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@
.DEF_PERFORMANCE_HISTORY_ON_RESCHEDULE,
help='default history(in sec) to be collected on a job '
'reschedule'),
cfg.IntOpt('max_failed_task_retry_window',
default=constants.TelemetryCollection
.MAX_FAILED_TASK_RETRY_WINDOW,
help='Maximum time window (in sec) until which delfin supports '
'collection for failed tasks'),
cfg.BoolOpt('enable_dynamic_subprocess',
default=False,
help='Enable dynamic subprocess metrics collection'),
Expand Down
2 changes: 2 additions & 0 deletions delfin/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ class TelemetryCollection(object):
"""Default performance collection interval"""
DEF_PERFORMANCE_COLLECTION_INTERVAL = 900
DEF_PERFORMANCE_HISTORY_ON_RESCHEDULE = 1800
"""Maximum failed task retry window in seconds"""
MAX_FAILED_TASK_RETRY_WINDOW = 7200


class TelemetryTaskStatus(object):
Expand Down
1 change: 1 addition & 0 deletions delfin/drivers/fake_storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ def get_capabilities(context, filters=None):
"""Get capability of supported driver"""
return {
'is_historic': False,
'performance_metric_retention_window': 4500,
'resource_metrics': {
"storage": {
"throughput": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,23 @@
from datetime import datetime

import six
from delfin.task_manager.scheduler.schedulers.telemetry. \
failed_performance_collection_handler import \
FailedPerformanceCollectionHandler
from oslo_config import cfg
from oslo_log import log

from delfin import db
from delfin import exception
from delfin.common.constants import TelemetryCollection
from delfin.db.sqlalchemy.models import FailedTask
from delfin.drivers import api as driverapi
from delfin.task_manager import metrics_rpcapi as metrics_task_rpcapi
from delfin.task_manager.scheduler import schedule_manager
from delfin.task_manager.scheduler.schedulers.telemetry. \
failed_performance_collection_handler import \
FailedPerformanceCollectionHandler
from delfin.task_manager.tasks.telemetry import PerformanceCollectionTask

LOG = log.getLogger(__name__)
CONF = cfg.CONF


class PerformanceCollectionHandler(object):
Expand All @@ -39,6 +42,7 @@ def __init__(self, ctx, task_id, storage_id, args, interval, executor):
self.args = args
self.interval = interval
self.metric_task_rpcapi = metrics_task_rpcapi.TaskAPI()
self.driver_api = driverapi.API()
self.executor = executor
self.scheduler = schedule_manager.SchedulerManager().get_scheduler()

Expand Down Expand Up @@ -96,10 +100,32 @@ def __call__(self):
.format(self.storage_id, self.task_id, self.interval))

def _handle_task_failure(self, start_time, end_time):
failed_task_interval = TelemetryCollection.FAILED_JOB_SCHEDULE_INTERVAL

try:
# Fetch driver's capability for performance metric retention window
# If driver supports it and if it is within collection range,
# consider it for failed task scheduling
capabilities = self.driver_api.get_capabilities(self.ctx,
self.storage_id)
performance_metric_retention_window \
= capabilities.get('performance_metric_retention_window')

if performance_metric_retention_window:
collection_window = performance_metric_retention_window \
if performance_metric_retention_window <= CONF.telemetry \
.max_failed_task_retry_window \
else CONF.telemetry.max_failed_task_retry_window
failed_task_interval = collection_window / TelemetryCollection\
.MAX_FAILED_JOB_RETRY_COUNT
except Exception as e:
LOG.error("Failed to get driver capabilities during failed task "
"scheduling for storage id :{0}, reason:{1}"
.format(self.storage_id, six.text_type(e)))

failed_task = {FailedTask.storage_id.name: self.storage_id,
FailedTask.task_id.name: self.task_id,
FailedTask.interval.name:
TelemetryCollection.FAILED_JOB_SCHEDULE_INTERVAL,
FailedTask.interval.name: failed_task_interval,
FailedTask.end_time.name: end_time,
FailedTask.start_time.name: start_time,
FailedTask.method.name:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,14 @@ def test_performance_collection_success(self, mock_collect_telemetry,
@mock.patch('delfin.db.failed_task_create')
@mock.patch('delfin.task_manager.tasks.telemetry'
'.PerformanceCollectionTask.collect')
def test_performance_collection_failure(self, mock_collect_telemetry,
@mock.patch('delfin.drivers.api.API.get_capabilities')
def test_performance_collection_failure(self, mock_get_capabilities,
mock_collect_telemetry,
mock_failed_task_create,
mock_assign_failed_job,
mock_task_update):

mock_get_capabilities.return_value = {}
mock_collect_telemetry.return_value = TelemetryTaskStatus. \
TASK_EXEC_STATUS_FAILURE
ctx = context.get_admin_context()
Expand Down

0 comments on commit 116a391

Please sign in to comment.