From c704f863b6c4e388a1d0717ee7a745ef364cd36c Mon Sep 17 00:00:00 2001 From: jignesh-crest <95845553+jignesh-crest@users.noreply.github.com> Date: Tue, 21 Mar 2023 17:33:52 +0530 Subject: [PATCH] [GCS]-Parse configurations from Kibana UI (#493) (cherry picked from commit 2144c0f02e7097ca25de6432a85aca8794911d1a) --- connectors/sources/google_cloud_storage.py | 189 +++++++++++------- .../tests/test_google_cloud_storage.py | 60 ++++-- connectors/tests/test_utils.py | 25 +++ connectors/utils.py | 16 ++ 4 files changed, 195 insertions(+), 95 deletions(-) diff --git a/connectors/sources/google_cloud_storage.py b/connectors/sources/google_cloud_storage.py index d183e48a3..db4fa12d2 100644 --- a/connectors/sources/google_cloud_storage.py +++ b/connectors/sources/google_cloud_storage.py @@ -9,7 +9,7 @@ import json import os import urllib.parse -from functools import partial +from functools import cached_property, partial import aiofiles from aiofiles.os import remove @@ -19,7 +19,7 @@ from connectors.logger import logger from connectors.source import BaseDataSource -from connectors.utils import TIKA_SUPPORTED_FILETYPES, convert_to_b64 +from connectors.utils import TIKA_SUPPORTED_FILETYPES, convert_to_b64, get_pem_format CLOUD_STORAGE_READ_ONLY_SCOPE = "https://www.googleapis.com/auth/devstorage.read_only" CLOUD_STORAGE_BASE_URL = "https://console.cloud.google.com/storage/browser/_details/" @@ -59,76 +59,24 @@ ) -class GoogleCloudStorageDataSource(BaseDataSource): - """Google Cloud Storage""" +class GoogleCloudStorageClient: + """A google client to handle api calls made to Google Cloud Storage.""" - name = "Google Cloud Storage" - service_type = "google_cloud_storage" - - def __init__(self, configuration): - """Set up the connection to the Google Cloud Storage Client. + def __init__(self, retry_count, json_credentials): + """Initialize the ServiceAccountCreds class using which api calls will be made. Args: - configuration (DataSourceConfiguration): Object of DataSourceConfiguration class. + retry_count (int): Maximum retries for the failed requests. + json_credentials (dict): Service account credentials json. """ - super().__init__(configuration=configuration) - if not self.configuration["service_account_credentials"]: - raise Exception("service_account_credentials can't be empty.") - - self.credentials = ( - self.configuration["service_account_credentials"] - .strip() - .encode("unicode_escape") - .decode() - ) + self.retry_count = retry_count self.service_account_credentials = ServiceAccountCreds( scopes=[CLOUD_STORAGE_READ_ONLY_SCOPE], - **json.loads( - self.configuration["service_account_credentials"] - if RUNNING_FTEST - else self.credentials - ), + **json_credentials, ) self.user_project_id = self.service_account_credentials.project_id - self.retry_count = self.configuration["retry_count"] - self.enable_content_extraction = self.configuration["enable_content_extraction"] - - @classmethod - def get_default_configuration(cls): - """Get the default configuration for Google Cloud Storage. - - Returns: - dictionary: Default configuration. - """ - default_credentials = { - "type": "service_account", - "project_id": "dummy_project_id", - "private_key_id": "abc", - "private_key": open(DEFAULT_PEM_FILE).read(), - "client_email": "123-abc@developer.gserviceaccount.com", - "client_id": "123-abc.apps.googleusercontent.com", - "auth_uri": "https://accounts.google.com/o/oauth2/auth", - "token_uri": "http://localhost:443/token", - } - return { - "service_account_credentials": { - "value": json.dumps(default_credentials), - "label": "JSON string for Google Cloud service account", - "type": "str", - }, - "retry_count": { - "value": DEFAULT_RETRY_COUNT, - "label": "Maximum retries for failed requests", - "type": "int", - }, - "enable_content_extraction": { - "value": DEFAULT_CONTENT_EXTRACTION, - "label": "Enable content extraction (true/false)", - "type": "bool", - }, - } - async def _api_call( + async def api_call( self, resource, method, @@ -136,7 +84,7 @@ async def _api_call( full_response=False, **kwargs, ): - """Method for adding retries whenever exception raised during an api calls + """Make a GET call for Google Cloud Storage with retry for the failed API calls. Args: resource (aiogoogle.resource.Resource): Resource name for which api call will be made. @@ -178,6 +126,7 @@ async def _api_call( ) async for page_items in first_page_with_next_attached: yield page_items + retry_counter = 0 else: if sub_method: method_object = getattr(method_object, sub_method) @@ -199,17 +148,109 @@ async def _api_call( ) await asyncio.sleep(DEFAULT_WAIT_MULTIPLIER**retry_counter) + +class GoogleCloudStorageDataSource(BaseDataSource): + """Google Cloud Storage""" + + name = "Google Cloud Storage" + service_type = "google_cloud_storage" + + def __init__(self, configuration): + """Set up the connection to the Google Cloud Storage Client. + + Args: + configuration (DataSourceConfiguration): Object of DataSourceConfiguration class. + """ + super().__init__(configuration=configuration) + self.enable_content_extraction = self.configuration["enable_content_extraction"] + + @classmethod + def get_default_configuration(cls): + """Get the default configuration for Google Cloud Storage. + + Returns: + dictionary: Default configuration. + """ + default_credentials = { + "type": "service_account", + "project_id": "dummy_project_id", + "private_key_id": "abc", + "private_key": open(DEFAULT_PEM_FILE).read(), + "client_email": "123-abc@developer.gserviceaccount.com", + "client_id": "123-abc.apps.googleusercontent.com", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "http://localhost:443/token", + } + return { + "service_account_credentials": { + "value": json.dumps(default_credentials), + "label": "Google Cloud service account json", + "type": "str", + }, + "retry_count": { + "value": DEFAULT_RETRY_COUNT, + "label": "Maximum retries for failed requests", + "type": "int", + }, + "enable_content_extraction": { + "value": DEFAULT_CONTENT_EXTRACTION, + "label": "Enable content extraction (true/false)", + "type": "bool", + }, + } + + async def validate_config(self): + """Validates whether user inputs are valid or not for configuration field. + + Raises: + Exception: The service account json is empty. + Exception: The format of service account json is invalid. + """ + if ( + self.configuration["service_account_credentials"] == "" + or self.configuration["service_account_credentials"] is None + ): + raise Exception("Google Cloud service account json can't be empty.") + try: + json.loads(self.configuration["service_account_credentials"]) + except ValueError: + raise Exception("Google Cloud service account is not a valid JSON.") + + @cached_property + def _google_storage_client(self): + """Initialize and return the GoogleCloudStorageClient + + Returns: + GoogleCloudStorageClient: An instance of the GoogleCloudStorageClient. + """ + json_credentials = json.loads(self.configuration["service_account_credentials"]) + + if ( + json_credentials.get("private_key") + and "\n" not in json_credentials["private_key"] + ): + json_credentials["private_key"] = get_pem_format( + key=json_credentials["private_key"].strip(), + max_split=2, + ) + + return GoogleCloudStorageClient( + json_credentials=json_credentials, + retry_count=self.configuration["retry_count"], + ) + async def ping(self): """Verify the connection with Google Cloud Storage""" if RUNNING_FTEST: return + try: await anext( - self._api_call( + self._google_storage_client.api_call( resource="projects", method="serviceAccount", sub_method="get", - projectId=self.user_project_id, + projectId=self._google_storage_client.user_project_id, ) ) logger.info("Successfully connected to the Google Cloud Storage.") @@ -223,12 +264,12 @@ async def fetch_buckets(self): Yields: Dictionary: Contains the list of fetched buckets from Google Cloud Storage. """ - async for bucket in self._api_call( + async for bucket in self._google_storage_client.api_call( resource="buckets", method="list", full_response=True, - project=self.user_project_id, - userProject=self.user_project_id, + project=self._google_storage_client.user_project_id, + userProject=self._google_storage_client.user_project_id, ): yield bucket @@ -242,12 +283,12 @@ async def fetch_blobs(self, buckets): Dictionary: Contains the list of fetched blobs from Google Cloud Storage. """ for bucket in buckets.get("items", []): - async for blob in self._api_call( + async for blob in self._google_storage_client.api_call( resource="objects", method="list", full_response=True, bucket=bucket["id"], - userProject=self.user_project_id, + userProject=self._google_storage_client.user_project_id, ): yield blob @@ -266,7 +307,7 @@ def prepare_blob_document(self, blob): blob_name = urllib.parse.quote(blob_document["name"], safe="'") blob_document[ "url" - ] = f"{CLOUD_STORAGE_BASE_URL}{blob_document['bucket_name']}/{blob_name};tab=live_object?project={self.user_project_id}" + ] = f"{CLOUD_STORAGE_BASE_URL}{blob_document['bucket_name']}/{blob_name};tab=live_object?project={self._google_storage_client.user_project_id}" return blob_document def get_blob_document(self, blobs): @@ -314,13 +355,13 @@ async def get_content(self, blob, timestamp=None, doit=None): source_file_name = "" async with NamedTemporaryFile(mode="wb", delete=False) as async_buffer: await anext( - self._api_call( + self._google_storage_client.api_call( resource="objects", method="get", bucket=blob["bucket_name"], object=blob_name, alt="media", - userProject=self.user_project_id, + userProject=self._google_storage_client.user_project_id, pipe_to=async_buffer, ) ) diff --git a/connectors/sources/tests/test_google_cloud_storage.py b/connectors/sources/tests/test_google_cloud_storage.py index 064508d0d..c566d6130 100644 --- a/connectors/sources/tests/test_google_cloud_storage.py +++ b/connectors/sources/tests/test_google_cloud_storage.py @@ -22,7 +22,7 @@ API_VERSION = "v1" -def get_mocked_source_object(): +def get_gcs_source_object(): """Creates the mocked Google cloud storage object. Returns: @@ -62,10 +62,28 @@ async def test_empty_configuration(): # Setup configuration = DataSourceConfiguration({"service_account_credentials": ""}) + gcs_object = GoogleCloudStorageDataSource(configuration=configuration) # Execute - with pytest.raises(Exception, match="service_account_credentials can't be empty."): - _ = GoogleCloudStorageDataSource(configuration=configuration) + with pytest.raises( + Exception, match="Google Cloud service account json can't be empty." + ): + await gcs_object.validate_config() + + +@pytest.mark.asyncio +async def test_raise_on_invalid_configuration(): + # Setup + configuration = DataSourceConfiguration( + {"service_account_credentials": "{'abc':'bcd','cd'}"} + ) + gcs_object = GoogleCloudStorageDataSource(configuration=configuration) + + # Execute + with pytest.raises( + Exception, match="Google Cloud service account is not a valid JSON" + ): + await gcs_object.validate_config() @pytest.mark.asyncio @@ -78,7 +96,7 @@ async def test_ping_for_successful_connection(catch_stdout, patch_logger): "kind": "storage#serviceAccount", "email_address": "serviceaccount@email.com", } - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() as_service_account_response = asyncio.Future() as_service_account_response.set_result(expected_response) @@ -96,7 +114,7 @@ async def test_ping_for_failed_connection(catch_stdout, patch_logger): # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() # Execute with mock.patch.object( @@ -155,7 +173,7 @@ def test_get_blob_document(previous_documents_list, updated_documents_list): """ # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() # Execute and Assert assert updated_documents_list == list( @@ -168,7 +186,7 @@ async def test_fetch_buckets(): """Tests the method which lists the storage buckets available in Google Cloud Storage.""" # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() expected_response = { "kind": "storage#objects", "items": [ @@ -220,7 +238,7 @@ async def test_fetch_blobs(): """Tests the method responsible to yield blobs from Google Cloud Storage bucket.""" # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() expected_bucket_response = { "kind": "storage#objects", "items": [ @@ -269,7 +287,7 @@ async def test_get_docs(): """Tests the module responsible to fetch and yield blobs documents from Google Cloud Storage.""" # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() expected_response = { "kind": "storage#objects", "items": [ @@ -323,7 +341,7 @@ async def test_get_docs_when_no_buckets_present(): """ # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() expected_response = { "kind": "storage#objects", } @@ -350,7 +368,7 @@ async def test_get_content(): """Test the module responsible for fetching the content of the file if it is extractable.""" # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() blob_document = { "id": "bucket_1/blob_1/123123123", "component_count": None, @@ -380,7 +398,7 @@ async def test_get_content(): Aiogoogle, "as_service_account", return_value=blob_content_response ): google_client = Aiogoogle( - service_account_creds=mocked_gcs_object.service_account_credentials + service_account_creds=mocked_gcs_object._google_storage_client.service_account_credentials ) storage_client = await google_client.discover( api_name=API_NAME, api_version=API_VERSION @@ -398,7 +416,7 @@ async def test_get_content_when_type_not_supported(): """Test the module responsible for fetching the content of the file if it is not extractable or doit is not true.""" # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() blob_document = { "_id": "bucket_1/blob_1/123123123", "component_count": None, @@ -419,7 +437,7 @@ async def test_get_content_when_type_not_supported(): # Execute and Assert google_client = Aiogoogle( - service_account_creds=mocked_gcs_object.service_account_credentials + service_account_creds=mocked_gcs_object._google_storage_client.service_account_credentials ) storage_client = await google_client.discover( api_name=API_NAME, api_version=API_VERSION @@ -442,7 +460,7 @@ async def test_get_content_when_file_size_is_large(catch_stdout, patch_logger): """Test the module responsible for fetching the content of the file if it is not extractable or doit is not true.""" # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() blob_document = { "_id": "bucket_1/blob_1/123123123", "component_count": None, @@ -463,7 +481,7 @@ async def test_get_content_when_file_size_is_large(catch_stdout, patch_logger): # Execute and Assert google_client = Aiogoogle( - service_account_creds=mocked_gcs_object.service_account_credentials + service_account_creds=mocked_gcs_object._google_storage_client.service_account_credentials ) storage_client = await google_client.discover( api_name=API_NAME, api_version=API_VERSION @@ -483,19 +501,19 @@ async def test_get_content_when_file_size_is_large(catch_stdout, patch_logger): @pytest.mark.asyncio async def test_api_call_for_attribute_error(catch_stdout, patch_logger): - """Tests the _api_call method when resource attribute is not present in the getattr.""" + """Tests the api_call method when resource attribute is not present in the getattr.""" # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() # Execute with pytest.raises(AttributeError): - async for _ in mocked_gcs_object._api_call( + async for _ in mocked_gcs_object._google_storage_client.api_call( resource="buckets_dummy", method="list", full_response=True, - project=mocked_gcs_object.user_project_id, - userProject=mocked_gcs_object.user_project_id, + project=mocked_gcs_object._google_storage_client.user_project_id, + userProject=mocked_gcs_object._google_storage_client.user_project_id, ): print("Method called successfully....") diff --git a/connectors/tests/test_utils.py b/connectors/tests/test_utils.py index 7d3d2f9df..760dd9201 100644 --- a/connectors/tests/test_utils.py +++ b/connectors/tests/test_utils.py @@ -27,6 +27,7 @@ RetryStrategy, convert_to_b64, get_base64_value, + get_pem_format, get_size, next_run, retryable, @@ -336,3 +337,27 @@ async def does_not_raise(): # would fail, if retried once (retry_interval = 5 seconds). Explicit time boundary for this test: 1 second await does_not_raise() + + +def test_get_pem_format(): + """This function tests prepare private key and certificate with dummy values""" + # Setup + expected_formated_pem_key = """-----BEGIN PRIVATE KEY----- +PrivateKey +-----END PRIVATE KEY-----""" + private_key = "-----BEGIN PRIVATE KEY----- PrivateKey -----END PRIVATE KEY-----" + + # Execute + formated_privat_key = get_pem_format(key=private_key, max_split=2) + assert formated_privat_key == expected_formated_pem_key + + # Setup + expected_formated_certificate = """-----BEGIN CERTIFICATE----- +Certificate1 +Certificate2 +-----END CERTIFICATE-----""" + certificate = "-----BEGIN CERTIFICATE----- Certificate1 Certificate2 -----END CERTIFICATE-----" + + # Execute + formated_certificate = get_pem_format(key=certificate, max_split=1) + assert formated_certificate == expected_formated_certificate diff --git a/connectors/utils.py b/connectors/utils.py index 03f0744a4..53d040bb3 100644 --- a/connectors/utils.py +++ b/connectors/utils.py @@ -397,3 +397,19 @@ async def func_to_execute(*args, **kwargs): return func_to_execute return wrapper + + +def get_pem_format(key, max_split=-1): + """Convert key into PEM format. + + Args: + key (str): Key in raw format. + max_split (int): Specifies how many splits to do. Defaults to -1. + + Returns: + string: PEM format + """ + key = key.replace(" ", "\n") + key = " ".join(key.split("\n", max_split)) + key = " ".join(key.rsplit("\n", max_split)) + return key