diff --git a/delfin/alert_manager/alert_processor.py b/delfin/alert_manager/alert_processor.py index d521b895b..b9e38c53d 100644 --- a/delfin/alert_manager/alert_processor.py +++ b/delfin/alert_manager/alert_processor.py @@ -48,11 +48,17 @@ def process_alert_info(self, alert): alert) # Fill storage specific info if alert_model: + storage = self.get_storage_from_parsed_alert( + ctxt, storage, alert_model) alert_util.fill_storage_attributes(alert_model, storage) except exception.IncompleteTrapInformation as e: LOG.warn(e) threading.Thread(target=self.sync_storage_alert, args=(ctxt, alert['storage_id'])).start() + except exception.AlertSourceNotFound: + LOG.info("Could not identify alert source from parsed alert. " + "Skipping the dispatch of alert") + return except Exception as e: LOG.error(e) raise exception.InvalidResults( @@ -60,7 +66,43 @@ def process_alert_info(self, alert): # Export to base exporter which handles dispatch for all exporters if alert_model: - self.exporter_manager.dispatch(ctxt, alert_model) + LOG.info("Dispatching one SNMP Trap to {} with sn {}".format( + alert_model['storage_id'], alert_model['serial_number'])) + self.exporter_manager.dispatch(ctxt, [alert_model]) + + def get_storage_from_parsed_alert(self, ctxt, storage, alert_model): + # If parse_alert sets 'serial_number' or 'storage_name' in the + # alert_model, we need to get corresponding storage details + # from the db and fill that in alert_model + storage_sn = alert_model.get('serial_number') + storage_name = alert_model.get('storage_name') + filters = { + "vendor": storage['vendor'], + "model": storage['model'], + } + try: + if storage_sn and storage_sn != storage['serial_number']: + filters['serial_number'] = storage_sn + elif storage_name and storage_name != storage['name']: + filters['name'] = storage_name + else: + return storage + + storage_list = db.storage_get_all(ctxt, filters=filters) + if not storage_list: + msg = "Failed to get destination storage for SNMP Trap. " \ + "Storage with serial number {} or storage name {} " \ + "not found in DB".format(storage_sn, storage_name) + raise exception.AlertSourceNotFound(msg) + db.alert_source_get(ctxt, storage_list[0]['id']) + storage = storage_list[0] + except exception.AlertSourceNotFound: + LOG.info("Storage with serial number {} or name {} " + "is not registered for receiving " + "SNMP Trap".format(storage_sn, storage_name)) + raise + + return storage @coordination.synchronized('sync-trap-{storage_id}', blocking=False) def sync_storage_alert(self, context, storage_id): diff --git a/delfin/alert_manager/trap_receiver.py b/delfin/alert_manager/trap_receiver.py index e9727dcf7..70fc51d1e 100644 --- a/delfin/alert_manager/trap_receiver.py +++ b/delfin/alert_manager/trap_receiver.py @@ -182,12 +182,7 @@ def _get_alert_source_by_host(source_ip): if not alert_source: raise exception.AlertSourceNotFoundWithHost(source_ip) - # This is to make sure unique host is configured each alert source - if len(alert_source) > 1: - msg = (_("Failed to get unique alert source with host %s.") - % source_ip) - raise exception.InvalidResults(msg) - + # Return first configured source that can handle the trap return alert_source[0] def _cb_fun(self, state_reference, context_engine_id, context_name, diff --git a/delfin/api/schemas/storages.py b/delfin/api/schemas/storages.py index e7d765d6a..0c723e434 100644 --- a/delfin/api/schemas/storages.py +++ b/delfin/api/schemas/storages.py @@ -19,6 +19,7 @@ 'properties': { 'vendor': {'type': 'string', 'minLength': 1, 'maxLength': 255}, 'model': {'type': 'string', 'minLength': 1, 'maxLength': 255}, + 'storage_name': {'type': 'string', 'minLength': 1, 'maxLength': 255}, 'rest': { 'type': 'object', 'properties': { diff --git a/delfin/api/v1/access_info.py b/delfin/api/v1/access_info.py index b789a1530..c1717f11b 100644 --- a/delfin/api/v1/access_info.py +++ b/delfin/api/v1/access_info.py @@ -11,12 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import copy + from delfin import db from delfin import cryptor from delfin.api import validation from delfin.api.common import wsgi from delfin.api.schemas import access_info as schema_access_info from delfin.api.views import access_info as access_info_viewer +from delfin.db.sqlalchemy.models import AccessInfo from delfin.common import constants from delfin.drivers import api as driverapi @@ -34,11 +37,38 @@ def show(self, req, id): access_info = db.access_info_get(ctxt, id) return self._view_builder.show(access_info) + def _cm_access_info_update(self, ctxt, access_info, body): + access_info_dict = copy.deepcopy(access_info) + unused = ['created_at', 'updated_at', 'storage_name', + 'storage_id', 'extra_attributes'] + access_info_dict = AccessInfo.to_dict(access_info_dict) + for field in unused: + if access_info_dict.get(field): + access_info_dict.pop(field) + for access in constants.ACCESS_TYPE: + if access_info_dict.get(access): + access_info_dict.pop(access) + + access_info_list = db.access_info_get_all( + ctxt, filters=access_info_dict) + + for cm_access_info in access_info_list: + if cm_access_info['storage_id'] == access_info['storage_id']: + continue + for access in constants.ACCESS_TYPE: + if cm_access_info.get(access): + cm_access_info[access]['password'] = cryptor.decode( + cm_access_info[access]['password']) + if body.get(access): + cm_access_info[access].update(body[access]) + self.driver_api.update_access_info(ctxt, cm_access_info) + @validation.schema(schema_access_info.update) def update(self, req, id, body): """Update storage access information.""" ctxt = req.environ.get('delfin.context') access_info = db.access_info_get(ctxt, id) + self._cm_access_info_update(ctxt, access_info, body) for access in constants.ACCESS_TYPE: if access_info.get(access): access_info[access]['password'] = cryptor.decode( diff --git a/delfin/api/v1/storages.py b/delfin/api/v1/storages.py index 61fd9fb95..19ab666eb 100755 --- a/delfin/api/v1/storages.py +++ b/delfin/api/v1/storages.py @@ -186,17 +186,9 @@ def sync(self, req, id): def _storage_exist(self, context, access_info): access_info_dict = copy.deepcopy(access_info) + access_info_list = access_info_filter( + context, access_info_dict) - # Remove unrelated query fields - unrelated_fields = ['username', 'password'] - for access in constants.ACCESS_TYPE: - if access_info_dict.get(access): - for key in unrelated_fields: - access_info_dict[access].pop(key) - - # Check if storage is registered - access_info_list = db.access_info_get_all(context, - filters=access_info_dict) for _access_info in access_info_list: try: storage = db.storage_get(context, _access_info['storage_id']) @@ -260,3 +252,31 @@ def _set_synced_if_ok(context, storage_id, resource_count): storage['sync_status'] = resource_count * constants.ResourceSync.START storage['updated_at'] = current_time db.storage_update(context, storage['id'], storage) + + +def access_info_filter(context, access_info): + access_info_dict = copy.deepcopy(access_info) + + for access in constants.ACCESS_TYPE: + if access_info_dict.get(access): + access_info_dict.pop(access) + + # Check if storage is registered + access_info_list = db.access_info_get_all(context, + filters=access_info_dict) + filtered_list = [] + for access_info_db in access_info_list: + match = True + for access in constants.ACCESS_TYPE: + access_filter = access_info.get(access) + access_db = access_info_db.get(access) + if match and access_filter: + if not access_db or\ + access_filter['host'] != access_db['host'] or\ + access_filter['port'] != access_db['port']: + match = False + break + if match: + filtered_list.append(access_info_db) + + return filtered_list diff --git a/delfin/context.py b/delfin/context.py index 04817306e..85d8db848 100644 --- a/delfin/context.py +++ b/delfin/context.py @@ -71,6 +71,7 @@ def __init__(self, user_id=None, project_id=None, is_admin=None, self.user_id = self.user self.tenant = project_id or tenant self.project_id = self.tenant + self.storage_id = None self.read_deleted = read_deleted self.remote_address = remote_address @@ -107,6 +108,7 @@ def to_dict(self): values.update({ 'user_id': getattr(self, 'user_id', None), 'project_id': getattr(self, 'project_id', None), + 'storage_id': getattr(self, 'storage_id', None), 'read_deleted': getattr(self, 'read_deleted', None), 'remote_address': getattr(self, 'remote_address', None), 'timestamp': self.timestamp.isoformat() if hasattr( diff --git a/delfin/db/sqlalchemy/models.py b/delfin/db/sqlalchemy/models.py index cabbf750a..7b1e304be 100644 --- a/delfin/db/sqlalchemy/models.py +++ b/delfin/db/sqlalchemy/models.py @@ -51,6 +51,7 @@ class AccessInfo(BASE, DelfinBase): """Represent access info required for storage accessing.""" __tablename__ = "access_info" storage_id = Column(String(36), primary_key=True) + storage_name = Column(String(255)) vendor = Column(String(255)) model = Column(String(255)) rest = Column(JsonEncodedDict) diff --git a/delfin/drivers/api.py b/delfin/drivers/api.py index de471dd9a..de50bc637 100644 --- a/delfin/drivers/api.py +++ b/delfin/drivers/api.py @@ -68,6 +68,8 @@ def update_access_info(self, context, access_info): def remove_storage(self, context, storage_id): """Clear driver instance from driver factory.""" + driver = self.driver_manager.get_driver(context, storage_id=storage_id) + driver.delete_storage(context) self.driver_manager.remove_driver(storage_id) def get_storage(self, context, storage_id): diff --git a/delfin/drivers/driver.py b/delfin/drivers/driver.py index 580ee88f7..0e7eb6c7a 100644 --- a/delfin/drivers/driver.py +++ b/delfin/drivers/driver.py @@ -28,6 +28,14 @@ def __init__(self, **kwargs): """ self.storage_id = kwargs.get('storage_id', None) + def delete_storage(self, context): + """Cleanup storage device information from driver""" + pass + + def add_storage(self, kwargs): + """Add storage device information to driver""" + pass + @abc.abstractmethod def reset_connection(self, context, **kwargs): """ Reset connection with backend with new args """ diff --git a/delfin/drivers/manager.py b/delfin/drivers/manager.py index be665ce52..462870ab8 100644 --- a/delfin/drivers/manager.py +++ b/delfin/drivers/manager.py @@ -23,6 +23,7 @@ from delfin import exception from delfin import utils from delfin import ssl_utils +from delfin.common import constants LOG = log.getLogger(__name__) @@ -54,6 +55,7 @@ def get_driver(self, context, invoke_on_load=True, :type cache_on_load: bool :param kwargs: Parameters from access_info. """ + context.storage_id = kwargs.get('storage_id') kwargs = copy.deepcopy(kwargs) kwargs['verify'] = False ca_path = ssl_utils.get_storage_ca_path() @@ -89,6 +91,7 @@ def _get_driver_obj(self, context, cache_on_load=True, **kwargs): if kwargs['verify']: ssl_utils.reload_certificate(kwargs['verify']) + access_info = copy.deepcopy(kwargs) storage_id = access_info.pop('storage_id') access_info.pop('verify') @@ -98,6 +101,28 @@ def _get_driver_obj(self, context, cache_on_load=True, **kwargs): else: access_info = db.access_info_get( context, storage_id).to_dict() + + access_info_dict = copy.deepcopy(access_info) + remove_fields = ['created_at', 'updated_at', + 'storage_id', 'storage_name', + 'extra_attributes'] + # Remove unrelated query fields + for field in remove_fields: + if access_info_dict.get(field): + access_info_dict.pop(field) + for access in constants.ACCESS_TYPE: + if access_info_dict.get(access): + access_info_dict.pop(access) + + access_info_list = db.access_info_get_all( + context, filters=access_info_dict) + for _access_info in access_info_list: + if _access_info['storage_id'] in self.driver_factory: + driver = self.driver_factory[ + _access_info['storage_id']] + driver.add_storage(access_info) + self.driver_factory[storage_id] = driver + return driver access_info['verify'] = kwargs.get('verify') cls = self._get_driver_cls(**access_info) driver = cls(**access_info) diff --git a/delfin/task_manager/manager.py b/delfin/task_manager/manager.py index 348a44445..7b6d21f4a 100644 --- a/delfin/task_manager/manager.py +++ b/delfin/task_manager/manager.py @@ -21,6 +21,7 @@ from delfin import manager from delfin.drivers import manager as driver_manager +from delfin.drivers import api as driver_api from delfin.task_manager.tasks import alerts, telemetry LOG = log.getLogger(__name__) @@ -51,6 +52,7 @@ def remove_storage_resource(self, context, storage_id, resource_task): def remove_storage_in_cache(self, context, storage_id): LOG.info('Remove storage device in memory for storage id:{0}' .format(storage_id)) + driver_api.API().remove_storage(context, storage_id) drivers = driver_manager.DriverManager() drivers.remove_driver(storage_id) diff --git a/delfin/tests/unit/alert_manager/test_alert_processor.py b/delfin/tests/unit/alert_manager/test_alert_processor.py index 8f7d5e72c..c181e0e57 100644 --- a/delfin/tests/unit/alert_manager/test_alert_processor.py +++ b/delfin/tests/unit/alert_manager/test_alert_processor.py @@ -88,7 +88,7 @@ def test_process_alert_info_success(self, mock_ctxt, mock_export_model, # Verify that model returned by driver is exported mock_export_model.assert_called_once_with(expected_ctxt, - expected_alert_model) + [expected_alert_model]) @mock.patch('delfin.db.storage_get') @mock.patch('delfin.drivers.api.API.parse_alert', diff --git a/delfin/tests/unit/api/fakes.py b/delfin/tests/unit/api/fakes.py index 00b928658..9348cb763 100644 --- a/delfin/tests/unit/api/fakes.py +++ b/delfin/tests/unit/api/fakes.py @@ -155,7 +155,7 @@ def fake_access_info_get_all(context, marker=None, limit=None, sort_keys=None, 'vendor': 'fake_storage', 'rest': { 'host': '10.0.0.76', - 'port': '1234', + 'port': 1234, 'username': 'admin', 'password': b'YWJjZA==' }, diff --git a/delfin/tests/unit/api/v1/test_access_info.py b/delfin/tests/unit/api/v1/test_access_info.py index 8c7c533bd..692bcf555 100644 --- a/delfin/tests/unit/api/v1/test_access_info.py +++ b/delfin/tests/unit/api/v1/test_access_info.py @@ -42,6 +42,7 @@ def test_show(self): "model": "fake_driver", "vendor": "fake_storage", "storage_id": "865ffd4d-f1f7-47de-abc3-5541ef44d0c1", + "storage_name": None, "rest": { "host": "10.0.0.0", "port": 1234, @@ -95,6 +96,7 @@ def test_access_info_update(self): "model": "fake_driver", "vendor": "fake_storage", "storage_id": "865ffd4d-f1f7-47de-abc3-5541ef44d0c1", + "storage_name": None, "rest": { "username": "admin_modified", "host": "10.0.0.0", @@ -124,6 +126,7 @@ def test_show_all(self): "model": "fake_driver", "vendor": "fake_storage", "storage_id": "865ffd4d-f1f7-47de-abc3-5541ef44d0c1", + "storage_name": None, "rest": { "host": "10.0.0.0", "port": 1234, diff --git a/delfin/tests/unit/drivers/test_api.py b/delfin/tests/unit/drivers/test_api.py index 14371d0b5..20f73b625 100644 --- a/delfin/tests/unit/drivers/test_api.py +++ b/delfin/tests/unit/drivers/test_api.py @@ -190,12 +190,14 @@ def test_update_access_info(self, mock_storage_get, msg = "Storage backend could not be found" self.assertIn(msg, str(exc.exception)) + @mock.patch('delfin.drivers.manager.DriverManager.get_driver') @mock.patch('delfin.db.storage_get') @mock.patch('delfin.db.storage_create') @mock.patch('delfin.db.access_info_create') @mock.patch('delfin.db.storage_get_all') def test_remove_storage(self, mock_storage, mock_access_info, - mock_storage_create, mock_get_storage): + mock_storage_create, mock_get_storage, + mock_dm): storage = copy.deepcopy(STORAGE) storage['id'] = '12345' mock_storage.return_value = None @@ -204,6 +206,7 @@ def test_remove_storage(self, mock_storage, mock_access_info, api = API() api.discover_storage(context, ACCESS_INFO) mock_get_storage.return_value = None + mock_dm.return_value = FakeStorageDriver() storage_id = '12345'