Skip to content

Commit

Permalink
Merge pull request #850 from joseph-v/centrl_mgr
Browse files Browse the repository at this point in the history
Add Centralized Manager support
  • Loading branch information
joseph-v authored May 20, 2022
2 parents a44794d + c0ab246 commit 61fedb2
Show file tree
Hide file tree
Showing 15 changed files with 154 additions and 20 deletions.
44 changes: 43 additions & 1 deletion delfin/alert_manager/alert_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,61 @@ def process_alert_info(self, alert):
alert)
# Fill storage specific info
if alert_model:
storage = self.get_storage_from_parsed_alert(
ctxt, storage, alert_model)
alert_util.fill_storage_attributes(alert_model, storage)
except exception.IncompleteTrapInformation as e:
LOG.warn(e)
threading.Thread(target=self.sync_storage_alert,
args=(ctxt, alert['storage_id'])).start()
except exception.AlertSourceNotFound:
LOG.info("Could not identify alert source from parsed alert. "
"Skipping the dispatch of alert")
return
except Exception as e:
LOG.error(e)
raise exception.InvalidResults(
"Failed to fill the alert model from driver.")

# Export to base exporter which handles dispatch for all exporters
if alert_model:
self.exporter_manager.dispatch(ctxt, alert_model)
LOG.info("Dispatching one SNMP Trap to {} with sn {}".format(
alert_model['storage_id'], alert_model['serial_number']))
self.exporter_manager.dispatch(ctxt, [alert_model])

def get_storage_from_parsed_alert(self, ctxt, storage, alert_model):
# If parse_alert sets 'serial_number' or 'storage_name' in the
# alert_model, we need to get corresponding storage details
# from the db and fill that in alert_model
storage_sn = alert_model.get('serial_number')
storage_name = alert_model.get('storage_name')
filters = {
"vendor": storage['vendor'],
"model": storage['model'],
}
try:
if storage_sn and storage_sn != storage['serial_number']:
filters['serial_number'] = storage_sn
elif storage_name and storage_name != storage['name']:
filters['name'] = storage_name
else:
return storage

storage_list = db.storage_get_all(ctxt, filters=filters)
if not storage_list:
msg = "Failed to get destination storage for SNMP Trap. " \
"Storage with serial number {} or storage name {} " \
"not found in DB".format(storage_sn, storage_name)
raise exception.AlertSourceNotFound(msg)
db.alert_source_get(ctxt, storage_list[0]['id'])
storage = storage_list[0]
except exception.AlertSourceNotFound:
LOG.info("Storage with serial number {} or name {} "
"is not registered for receiving "
"SNMP Trap".format(storage_sn, storage_name))
raise

return storage

@coordination.synchronized('sync-trap-{storage_id}', blocking=False)
def sync_storage_alert(self, context, storage_id):
Expand Down
7 changes: 1 addition & 6 deletions delfin/alert_manager/trap_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,7 @@ def _get_alert_source_by_host(source_ip):
if not alert_source:
raise exception.AlertSourceNotFoundWithHost(source_ip)

# This is to make sure unique host is configured each alert source
if len(alert_source) > 1:
msg = (_("Failed to get unique alert source with host %s.")
% source_ip)
raise exception.InvalidResults(msg)

# Return first configured source that can handle the trap
return alert_source[0]

def _cb_fun(self, state_reference, context_engine_id, context_name,
Expand Down
1 change: 1 addition & 0 deletions delfin/api/schemas/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
'properties': {
'vendor': {'type': 'string', 'minLength': 1, 'maxLength': 255},
'model': {'type': 'string', 'minLength': 1, 'maxLength': 255},
'storage_name': {'type': 'string', 'minLength': 1, 'maxLength': 255},
'rest': {
'type': 'object',
'properties': {
Expand Down
30 changes: 30 additions & 0 deletions delfin/api/v1/access_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy

from delfin import db
from delfin import cryptor
from delfin.api import validation
from delfin.api.common import wsgi
from delfin.api.schemas import access_info as schema_access_info
from delfin.api.views import access_info as access_info_viewer
from delfin.db.sqlalchemy.models import AccessInfo
from delfin.common import constants
from delfin.drivers import api as driverapi

Expand All @@ -34,11 +37,38 @@ def show(self, req, id):
access_info = db.access_info_get(ctxt, id)
return self._view_builder.show(access_info)

def _cm_access_info_update(self, ctxt, access_info, body):
access_info_dict = copy.deepcopy(access_info)
unused = ['created_at', 'updated_at', 'storage_name',
'storage_id', 'extra_attributes']
access_info_dict = AccessInfo.to_dict(access_info_dict)
for field in unused:
if access_info_dict.get(field):
access_info_dict.pop(field)
for access in constants.ACCESS_TYPE:
if access_info_dict.get(access):
access_info_dict.pop(access)

access_info_list = db.access_info_get_all(
ctxt, filters=access_info_dict)

for cm_access_info in access_info_list:
if cm_access_info['storage_id'] == access_info['storage_id']:
continue
for access in constants.ACCESS_TYPE:
if cm_access_info.get(access):
cm_access_info[access]['password'] = cryptor.decode(
cm_access_info[access]['password'])
if body.get(access):
cm_access_info[access].update(body[access])
self.driver_api.update_access_info(ctxt, cm_access_info)

@validation.schema(schema_access_info.update)
def update(self, req, id, body):
"""Update storage access information."""
ctxt = req.environ.get('delfin.context')
access_info = db.access_info_get(ctxt, id)
self._cm_access_info_update(ctxt, access_info, body)
for access in constants.ACCESS_TYPE:
if access_info.get(access):
access_info[access]['password'] = cryptor.decode(
Expand Down
40 changes: 30 additions & 10 deletions delfin/api/v1/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,17 +186,9 @@ def sync(self, req, id):

def _storage_exist(self, context, access_info):
access_info_dict = copy.deepcopy(access_info)
access_info_list = access_info_filter(
context, access_info_dict)

# Remove unrelated query fields
unrelated_fields = ['username', 'password']
for access in constants.ACCESS_TYPE:
if access_info_dict.get(access):
for key in unrelated_fields:
access_info_dict[access].pop(key)

# Check if storage is registered
access_info_list = db.access_info_get_all(context,
filters=access_info_dict)
for _access_info in access_info_list:
try:
storage = db.storage_get(context, _access_info['storage_id'])
Expand Down Expand Up @@ -260,3 +252,31 @@ def _set_synced_if_ok(context, storage_id, resource_count):
storage['sync_status'] = resource_count * constants.ResourceSync.START
storage['updated_at'] = current_time
db.storage_update(context, storage['id'], storage)


def access_info_filter(context, access_info):
access_info_dict = copy.deepcopy(access_info)

for access in constants.ACCESS_TYPE:
if access_info_dict.get(access):
access_info_dict.pop(access)

# Check if storage is registered
access_info_list = db.access_info_get_all(context,
filters=access_info_dict)
filtered_list = []
for access_info_db in access_info_list:
match = True
for access in constants.ACCESS_TYPE:
access_filter = access_info.get(access)
access_db = access_info_db.get(access)
if match and access_filter:
if not access_db or\
access_filter['host'] != access_db['host'] or\
access_filter['port'] != access_db['port']:
match = False
break
if match:
filtered_list.append(access_info_db)

return filtered_list
2 changes: 2 additions & 0 deletions delfin/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(self, user_id=None, project_id=None, is_admin=None,
self.user_id = self.user
self.tenant = project_id or tenant
self.project_id = self.tenant
self.storage_id = None

self.read_deleted = read_deleted
self.remote_address = remote_address
Expand Down Expand Up @@ -107,6 +108,7 @@ def to_dict(self):
values.update({
'user_id': getattr(self, 'user_id', None),
'project_id': getattr(self, 'project_id', None),
'storage_id': getattr(self, 'storage_id', None),
'read_deleted': getattr(self, 'read_deleted', None),
'remote_address': getattr(self, 'remote_address', None),
'timestamp': self.timestamp.isoformat() if hasattr(
Expand Down
1 change: 1 addition & 0 deletions delfin/db/sqlalchemy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class AccessInfo(BASE, DelfinBase):
"""Represent access info required for storage accessing."""
__tablename__ = "access_info"
storage_id = Column(String(36), primary_key=True)
storage_name = Column(String(255))
vendor = Column(String(255))
model = Column(String(255))
rest = Column(JsonEncodedDict)
Expand Down
2 changes: 2 additions & 0 deletions delfin/drivers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def update_access_info(self, context, access_info):

def remove_storage(self, context, storage_id):
"""Clear driver instance from driver factory."""
driver = self.driver_manager.get_driver(context, storage_id=storage_id)
driver.delete_storage(context)
self.driver_manager.remove_driver(storage_id)

def get_storage(self, context, storage_id):
Expand Down
8 changes: 8 additions & 0 deletions delfin/drivers/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ def __init__(self, **kwargs):
"""
self.storage_id = kwargs.get('storage_id', None)

def delete_storage(self, context):
"""Cleanup storage device information from driver"""
pass

def add_storage(self, kwargs):
"""Add storage device information to driver"""
pass

@abc.abstractmethod
def reset_connection(self, context, **kwargs):
""" Reset connection with backend with new args """
Expand Down
25 changes: 25 additions & 0 deletions delfin/drivers/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from delfin import exception
from delfin import utils
from delfin import ssl_utils
from delfin.common import constants

LOG = log.getLogger(__name__)

Expand Down Expand Up @@ -54,6 +55,7 @@ def get_driver(self, context, invoke_on_load=True,
:type cache_on_load: bool
:param kwargs: Parameters from access_info.
"""
context.storage_id = kwargs.get('storage_id')
kwargs = copy.deepcopy(kwargs)
kwargs['verify'] = False
ca_path = ssl_utils.get_storage_ca_path()
Expand Down Expand Up @@ -89,6 +91,7 @@ def _get_driver_obj(self, context, cache_on_load=True, **kwargs):

if kwargs['verify']:
ssl_utils.reload_certificate(kwargs['verify'])

access_info = copy.deepcopy(kwargs)
storage_id = access_info.pop('storage_id')
access_info.pop('verify')
Expand All @@ -98,6 +101,28 @@ def _get_driver_obj(self, context, cache_on_load=True, **kwargs):
else:
access_info = db.access_info_get(
context, storage_id).to_dict()

access_info_dict = copy.deepcopy(access_info)
remove_fields = ['created_at', 'updated_at',
'storage_id', 'storage_name',
'extra_attributes']
# Remove unrelated query fields
for field in remove_fields:
if access_info_dict.get(field):
access_info_dict.pop(field)
for access in constants.ACCESS_TYPE:
if access_info_dict.get(access):
access_info_dict.pop(access)

access_info_list = db.access_info_get_all(
context, filters=access_info_dict)
for _access_info in access_info_list:
if _access_info['storage_id'] in self.driver_factory:
driver = self.driver_factory[
_access_info['storage_id']]
driver.add_storage(access_info)
self.driver_factory[storage_id] = driver
return driver
access_info['verify'] = kwargs.get('verify')
cls = self._get_driver_cls(**access_info)
driver = cls(**access_info)
Expand Down
2 changes: 2 additions & 0 deletions delfin/task_manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from delfin import manager
from delfin.drivers import manager as driver_manager
from delfin.drivers import api as driver_api
from delfin.task_manager.tasks import alerts, telemetry

LOG = log.getLogger(__name__)
Expand Down Expand Up @@ -51,6 +52,7 @@ def remove_storage_resource(self, context, storage_id, resource_task):
def remove_storage_in_cache(self, context, storage_id):
LOG.info('Remove storage device in memory for storage id:{0}'
.format(storage_id))
driver_api.API().remove_storage(context, storage_id)
drivers = driver_manager.DriverManager()
drivers.remove_driver(storage_id)

Expand Down
2 changes: 1 addition & 1 deletion delfin/tests/unit/alert_manager/test_alert_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_process_alert_info_success(self, mock_ctxt, mock_export_model,

# Verify that model returned by driver is exported
mock_export_model.assert_called_once_with(expected_ctxt,
expected_alert_model)
[expected_alert_model])

@mock.patch('delfin.db.storage_get')
@mock.patch('delfin.drivers.api.API.parse_alert',
Expand Down
2 changes: 1 addition & 1 deletion delfin/tests/unit/api/fakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def fake_access_info_get_all(context, marker=None, limit=None, sort_keys=None,
'vendor': 'fake_storage',
'rest': {
'host': '10.0.0.76',
'port': '1234',
'port': 1234,
'username': 'admin',
'password': b'YWJjZA=='
},
Expand Down
3 changes: 3 additions & 0 deletions delfin/tests/unit/api/v1/test_access_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def test_show(self):
"model": "fake_driver",
"vendor": "fake_storage",
"storage_id": "865ffd4d-f1f7-47de-abc3-5541ef44d0c1",
"storage_name": None,
"rest": {
"host": "10.0.0.0",
"port": 1234,
Expand Down Expand Up @@ -95,6 +96,7 @@ def test_access_info_update(self):
"model": "fake_driver",
"vendor": "fake_storage",
"storage_id": "865ffd4d-f1f7-47de-abc3-5541ef44d0c1",
"storage_name": None,
"rest": {
"username": "admin_modified",
"host": "10.0.0.0",
Expand Down Expand Up @@ -124,6 +126,7 @@ def test_show_all(self):
"model": "fake_driver",
"vendor": "fake_storage",
"storage_id": "865ffd4d-f1f7-47de-abc3-5541ef44d0c1",
"storage_name": None,
"rest": {
"host": "10.0.0.0",
"port": 1234,
Expand Down
5 changes: 4 additions & 1 deletion delfin/tests/unit/drivers/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,14 @@ def test_update_access_info(self, mock_storage_get,
msg = "Storage backend could not be found"
self.assertIn(msg, str(exc.exception))

@mock.patch('delfin.drivers.manager.DriverManager.get_driver')
@mock.patch('delfin.db.storage_get')
@mock.patch('delfin.db.storage_create')
@mock.patch('delfin.db.access_info_create')
@mock.patch('delfin.db.storage_get_all')
def test_remove_storage(self, mock_storage, mock_access_info,
mock_storage_create, mock_get_storage):
mock_storage_create, mock_get_storage,
mock_dm):
storage = copy.deepcopy(STORAGE)
storage['id'] = '12345'
mock_storage.return_value = None
Expand All @@ -204,6 +206,7 @@ def test_remove_storage(self, mock_storage, mock_access_info,
api = API()
api.discover_storage(context, ACCESS_INFO)
mock_get_storage.return_value = None
mock_dm.return_value = FakeStorageDriver()

storage_id = '12345'

Expand Down

0 comments on commit 61fedb2

Please sign in to comment.