From d2aab734d2afdf30d6a174bdddc544d23f9e1080 Mon Sep 17 00:00:00 2001 From: jignesh-crest Date: Thu, 16 Feb 2023 16:34:41 +0530 Subject: [PATCH 01/11] parse configs from ui by formating the private-key parse configs from ui by formating the private-key --- connectors/sources/google_cloud_storage.py | 70 ++++++++++++++----- .../tests/test_google_cloud_storage.py | 29 +++++++- 2 files changed, 79 insertions(+), 20 deletions(-) diff --git a/connectors/sources/google_cloud_storage.py b/connectors/sources/google_cloud_storage.py index d183e48a3..1acfba7f3 100644 --- a/connectors/sources/google_cloud_storage.py +++ b/connectors/sources/google_cloud_storage.py @@ -72,27 +72,13 @@ def __init__(self, configuration): configuration (DataSourceConfiguration): Object of DataSourceConfiguration class. """ super().__init__(configuration=configuration) - if not self.configuration["service_account_credentials"]: - raise Exception("service_account_credentials can't be empty.") - self.credentials = ( - self.configuration["service_account_credentials"] - .strip() - .encode("unicode_escape") - .decode() - ) - self.service_account_credentials = ServiceAccountCreds( - scopes=[CLOUD_STORAGE_READ_ONLY_SCOPE], - **json.loads( - self.configuration["service_account_credentials"] - if RUNNING_FTEST - else self.credentials - ), - ) - self.user_project_id = self.service_account_credentials.project_id self.retry_count = self.configuration["retry_count"] self.enable_content_extraction = self.configuration["enable_content_extraction"] + self.service_account_credentials = None + self.user_project_id = None + @classmethod def get_default_configuration(cls): """Get the default configuration for Google Cloud Storage. @@ -113,7 +99,7 @@ def get_default_configuration(cls): return { "service_account_credentials": { "value": json.dumps(default_credentials), - "label": "JSON string for Google Cloud service account", + "label": "Google Cloud service account json", "type": "str", }, "retry_count": { @@ -199,8 +185,54 @@ async def _api_call( ) await asyncio.sleep(DEFAULT_WAIT_MULTIPLIER**retry_counter) + def _validate_configurations(self): + """Validates whether user input is empty or not for configuration field. + + Raises: + Exception: Configured keys can't be empty. + """ + if self.configuration["service_account_credentials"] == "": + raise Exception("service_account_credentials can't be empty.") + + def get_pem_format(self, private_key, max_split=-1): + """Convert key into PEM format. + + Args: + private_key (str): Private_key in raw format. + max_split (int): Specifies how many splits to do. Defaults to -1. + + Returns: + string: PEM format + """ + private_key = private_key.replace(" ", "\n") + private_key = " ".join(private_key.split("\n", max_split)) + private_key = " ".join(private_key.rsplit("\n", max_split)) + return private_key + + def _initialize_configurations(self): + """Initialize the ServiceAccountCreds""" + json_creds = json.loads(self.configuration["service_account_credentials"]) + if ( + not RUNNING_FTEST + and json_creds.get("private_key") + and "\n" not in json_creds["private_key"] + ): + json_creds["private_key"] = self.get_pem_format( + private_key=json_creds["private_key"].strip(), max_split=2 + ) + + self.service_account_credentials = ServiceAccountCreds( + scopes=[CLOUD_STORAGE_READ_ONLY_SCOPE], + **json_creds, + ) + self.user_project_id = self.service_account_credentials.project_id + async def ping(self): - """Verify the connection with Google Cloud Storage""" + """Verify the configurations and connection with Google Cloud Storage""" + + self._validate_configurations() + self._initialize_configurations() + if RUNNING_FTEST: return try: diff --git a/connectors/sources/tests/test_google_cloud_storage.py b/connectors/sources/tests/test_google_cloud_storage.py index 064508d0d..037fac7ce 100644 --- a/connectors/sources/tests/test_google_cloud_storage.py +++ b/connectors/sources/tests/test_google_cloud_storage.py @@ -32,6 +32,7 @@ def get_mocked_source_object(): {"service_account_credentials": SERVICE_ACCOUNT_CREDENTIALS, "retry_count": 0} ) mocked_gcs_object = GoogleCloudStorageDataSource(configuration=configuration) + mocked_gcs_object._initialize_configurations() return mocked_gcs_object @@ -62,10 +63,11 @@ async def test_empty_configuration(): # Setup configuration = DataSourceConfiguration({"service_account_credentials": ""}) + gcs_object = GoogleCloudStorageDataSource(configuration=configuration) # Execute with pytest.raises(Exception, match="service_account_credentials can't be empty."): - _ = GoogleCloudStorageDataSource(configuration=configuration) + gcs_object._validate_configurations() @pytest.mark.asyncio @@ -499,3 +501,28 @@ async def test_api_call_for_attribute_error(catch_stdout, patch_logger): userProject=mocked_gcs_object.user_project_id, ): print("Method called successfully....") + + +def test_get_pem_format(): + """This function tests prepare private key with dummy values""" + # Setup + expected_formated_pem_key = """-----BEGIN PRIVATE KEY----- +PrivateKey +-----END PRIVATE KEY-----""" + source = get_mocked_source_object() + private_key = "-----BEGIN PRIVATE KEY----- PrivateKey -----END PRIVATE KEY-----" + + # Execute + formated_privat_key = source.get_pem_format(private_key, max_split=2) + assert formated_privat_key == expected_formated_pem_key + + # Setup + expected_formated_certificate = """-----BEGIN CERTIFICATE----- +Certificate1 +Certificate2 +-----END CERTIFICATE-----""" + private_key = "-----BEGIN CERTIFICATE----- Certificate1 Certificate2 -----END CERTIFICATE-----" + + # Execute + formated_privat_key = source.get_pem_format(private_key, max_split=1) + assert formated_privat_key == expected_formated_certificate From bcc60b1b9b30a0cedc14da979193d02437690281 Mon Sep 17 00:00:00 2001 From: jignesh-crest Date: Fri, 17 Feb 2023 18:36:16 +0530 Subject: [PATCH 02/11] Add custom exception, remove unwanted shilds --- connectors/source.py | 10 ++++++ connectors/sources/google_cloud_storage.py | 31 ++++++++++++------- .../tests/test_google_cloud_storage.py | 17 +++++++++- 3 files changed, 46 insertions(+), 12 deletions(-) diff --git a/connectors/source.py b/connectors/source.py index 63ba43f2e..2737d68f6 100644 --- a/connectors/source.py +++ b/connectors/source.py @@ -20,6 +20,16 @@ """ +class DataSourceConfigurationError(Exception): + """Raise when configurations are invalid. + + Args: + Exception (DataSourceConfigurationError): Invalid configurations exception. + """ + + pass + + class Field: def __init__(self, name, label=None, value="", type="str"): if label is None: diff --git a/connectors/sources/google_cloud_storage.py b/connectors/sources/google_cloud_storage.py index 1acfba7f3..2af7a301a 100644 --- a/connectors/sources/google_cloud_storage.py +++ b/connectors/sources/google_cloud_storage.py @@ -10,6 +10,7 @@ import os import urllib.parse from functools import partial +from json.decoder import JSONDecodeError import aiofiles from aiofiles.os import remove @@ -18,7 +19,7 @@ from aiogoogle.auth.creds import ServiceAccountCreds from connectors.logger import logger -from connectors.source import BaseDataSource +from connectors.source import BaseDataSource, DataSourceConfigurationError from connectors.utils import TIKA_SUPPORTED_FILETYPES, convert_to_b64 CLOUD_STORAGE_READ_ONLY_SCOPE = "https://www.googleapis.com/auth/devstorage.read_only" @@ -78,6 +79,7 @@ def __init__(self, configuration): self.service_account_credentials = None self.user_project_id = None + self.json_creds = None @classmethod def get_default_configuration(cls): @@ -186,13 +188,22 @@ async def _api_call( await asyncio.sleep(DEFAULT_WAIT_MULTIPLIER**retry_counter) def _validate_configurations(self): - """Validates whether user input is empty or not for configuration field. + """Validates whether user inputs are valid or not for configuration field. Raises: - Exception: Configured keys can't be empty. + DataSourceConfigurationError: The service account json is empty. + DataSourceConfigurationError: The service account json is in invalid format. """ if self.configuration["service_account_credentials"] == "": - raise Exception("service_account_credentials can't be empty.") + raise DataSourceConfigurationError("Service account json can't be empty.") + try: + self.json_creds = json.loads( + self.configuration["service_account_credentials"] + ) + except JSONDecodeError: + raise DataSourceConfigurationError( + "Service account json is not in valid format." + ) def get_pem_format(self, private_key, max_split=-1): """Convert key into PEM format. @@ -211,19 +222,17 @@ def get_pem_format(self, private_key, max_split=-1): def _initialize_configurations(self): """Initialize the ServiceAccountCreds""" - json_creds = json.loads(self.configuration["service_account_credentials"]) if ( - not RUNNING_FTEST - and json_creds.get("private_key") - and "\n" not in json_creds["private_key"] + self.json_creds.get("private_key") + and "\n" not in self.json_creds["private_key"] ): - json_creds["private_key"] = self.get_pem_format( - private_key=json_creds["private_key"].strip(), max_split=2 + self.json_creds["private_key"] = self.get_pem_format( + private_key=self.json_creds["private_key"].strip(), max_split=2 ) self.service_account_credentials = ServiceAccountCreds( scopes=[CLOUD_STORAGE_READ_ONLY_SCOPE], - **json_creds, + **self.json_creds, ) self.user_project_id = self.service_account_credentials.project_id diff --git a/connectors/sources/tests/test_google_cloud_storage.py b/connectors/sources/tests/test_google_cloud_storage.py index 037fac7ce..efd892ba2 100644 --- a/connectors/sources/tests/test_google_cloud_storage.py +++ b/connectors/sources/tests/test_google_cloud_storage.py @@ -32,6 +32,7 @@ def get_mocked_source_object(): {"service_account_credentials": SERVICE_ACCOUNT_CREDENTIALS, "retry_count": 0} ) mocked_gcs_object = GoogleCloudStorageDataSource(configuration=configuration) + mocked_gcs_object._validate_configurations() mocked_gcs_object._initialize_configurations() return mocked_gcs_object @@ -66,7 +67,21 @@ async def test_empty_configuration(): gcs_object = GoogleCloudStorageDataSource(configuration=configuration) # Execute - with pytest.raises(Exception, match="service_account_credentials can't be empty."): + with pytest.raises(Exception, match="Service account json can't be empty."): + gcs_object._validate_configurations() + + +def test_invalid_configuration(): + """Tests that the service account credential is in invalid json format""" + + # Setup + configuration = DataSourceConfiguration( + {"service_account_credentials": "{'abc':'bcd','cd'}"} + ) + gcs_object = GoogleCloudStorageDataSource(configuration=configuration) + + # Execute + with pytest.raises(Exception, match="Service account json is not in valid format."): gcs_object._validate_configurations() From 1238165c3cc622231dc2500130c02e804a58025d Mon Sep 17 00:00:00 2001 From: jignesh-crest Date: Thu, 2 Mar 2023 10:13:08 +0530 Subject: [PATCH 03/11] use validate config from basesource --- connectors/source.py | 10 --- connectors/sources/google_cloud_storage.py | 64 ++++++++++--------- .../tests/test_google_cloud_storage.py | 16 +++-- 3 files changed, 43 insertions(+), 47 deletions(-) diff --git a/connectors/source.py b/connectors/source.py index 9bf354fc7..e54779c17 100644 --- a/connectors/source.py +++ b/connectors/source.py @@ -20,16 +20,6 @@ ) -class DataSourceConfigurationError(Exception): - """Raise when configurations are invalid. - - Args: - Exception (DataSourceConfigurationError): Invalid configurations exception. - """ - - pass - - class Field: def __init__(self, name, label=None, value="", type="str"): if label is None: diff --git a/connectors/sources/google_cloud_storage.py b/connectors/sources/google_cloud_storage.py index 66dbd49e4..0e1262e04 100644 --- a/connectors/sources/google_cloud_storage.py +++ b/connectors/sources/google_cloud_storage.py @@ -10,7 +10,6 @@ import os import urllib.parse from functools import partial -from json.decoder import JSONDecodeError import aiofiles from aiofiles.os import remove @@ -19,7 +18,7 @@ from aiogoogle.auth.creds import ServiceAccountCreds from connectors.logger import logger -from connectors.source import BaseDataSource, DataSourceConfigurationError +from connectors.source import BaseDataSource from connectors.utils import TIKA_SUPPORTED_FILETYPES, convert_to_b64 CLOUD_STORAGE_READ_ONLY_SCOPE = "https://www.googleapis.com/auth/devstorage.read_only" @@ -73,13 +72,11 @@ def __init__(self, configuration): configuration (DataSourceConfiguration): Object of DataSourceConfiguration class. """ super().__init__(configuration=configuration) - self.retry_count = self.configuration["retry_count"] self.enable_content_extraction = self.configuration["enable_content_extraction"] self.service_account_credentials = None self.user_project_id = None - self.json_creds = None @classmethod def get_default_configuration(cls): @@ -116,6 +113,23 @@ def get_default_configuration(cls): }, } + async def validate_config(self): + """Validates whether user inputs are valid or not for configuration field. + + Raises: + Exception: The service account json is empty. + Exception: The format of service account json is invalid. + """ + if ( + self.configuration["service_account_credentials"] == "" + or self.configuration["service_account_credentials"] is None + ): + raise Exception("Google Cloud service account json can't be empty.") + try: + _ = json.loads(self.configuration["service_account_credentials"]) + except ValueError: + raise Exception("Google Cloud service account is not a valid JSON.") + async def _api_call( self, resource, @@ -187,24 +201,6 @@ async def _api_call( ) await asyncio.sleep(DEFAULT_WAIT_MULTIPLIER**retry_counter) - def _validate_configurations(self): - """Validates whether user inputs are valid or not for configuration field. - - Raises: - DataSourceConfigurationError: The service account json is empty. - DataSourceConfigurationError: The service account json is in invalid format. - """ - if self.configuration["service_account_credentials"] == "": - raise DataSourceConfigurationError("Service account json can't be empty.") - try: - self.json_creds = json.loads( - self.configuration["service_account_credentials"] - ) - except JSONDecodeError: - raise DataSourceConfigurationError( - "Service account json is not in valid format." - ) - def get_pem_format(self, private_key, max_split=-1): """Convert key into PEM format. @@ -222,28 +218,33 @@ def get_pem_format(self, private_key, max_split=-1): def _initialize_configurations(self): """Initialize the ServiceAccountCreds""" + if self.service_account_credentials is not None: + return + + json_credentials = json.loads(self.configuration["service_account_credentials"]) + if ( - self.json_creds.get("private_key") - and "\n" not in self.json_creds["private_key"] + json_credentials.get("private_key") + and "\n" not in json_credentials["private_key"] ): - self.json_creds["private_key"] = self.get_pem_format( - private_key=self.json_creds["private_key"].strip(), max_split=2 + json_credentials["private_key"] = self.get_pem_format( + private_key=json_credentials["private_key"].strip(), + max_split=2, ) self.service_account_credentials = ServiceAccountCreds( scopes=[CLOUD_STORAGE_READ_ONLY_SCOPE], - **self.json_creds, + **json_credentials, ) self.user_project_id = self.service_account_credentials.project_id async def ping(self): - """Verify the configurations and connection with Google Cloud Storage""" - - self._validate_configurations() - self._initialize_configurations() + """Verify the connection with Google Cloud Storage""" if RUNNING_FTEST: return + + self._initialize_configurations() try: await anext( self._api_call( @@ -385,6 +386,7 @@ async def get_docs(self, filtering=None): Yields: dictionary: Documents from Google Cloud Storage. """ + self._initialize_configurations() async for buckets in self.fetch_buckets(): if not buckets.get("items"): continue diff --git a/connectors/sources/tests/test_google_cloud_storage.py b/connectors/sources/tests/test_google_cloud_storage.py index efd892ba2..b3df2adde 100644 --- a/connectors/sources/tests/test_google_cloud_storage.py +++ b/connectors/sources/tests/test_google_cloud_storage.py @@ -32,7 +32,6 @@ def get_mocked_source_object(): {"service_account_credentials": SERVICE_ACCOUNT_CREDENTIALS, "retry_count": 0} ) mocked_gcs_object = GoogleCloudStorageDataSource(configuration=configuration) - mocked_gcs_object._validate_configurations() mocked_gcs_object._initialize_configurations() return mocked_gcs_object @@ -67,11 +66,14 @@ async def test_empty_configuration(): gcs_object = GoogleCloudStorageDataSource(configuration=configuration) # Execute - with pytest.raises(Exception, match="Service account json can't be empty."): - gcs_object._validate_configurations() + with pytest.raises( + Exception, match="Google Cloud service account json can't be empty." + ): + await gcs_object.validate_config() -def test_invalid_configuration(): +@pytest.mark.asyncio +async def test_invalid_configuration(): """Tests that the service account credential is in invalid json format""" # Setup @@ -81,8 +83,10 @@ def test_invalid_configuration(): gcs_object = GoogleCloudStorageDataSource(configuration=configuration) # Execute - with pytest.raises(Exception, match="Service account json is not in valid format."): - gcs_object._validate_configurations() + with pytest.raises( + Exception, match="Google Cloud service account is not a valid JSON" + ): + await gcs_object.validate_config() @pytest.mark.asyncio From fa2acc61519bcc4c9a4bb0e6810f8ff6b8112ded Mon Sep 17 00:00:00 2001 From: jignesh-crest <95845553+jignesh-crest@users.noreply.github.com> Date: Thu, 2 Mar 2023 14:21:05 +0530 Subject: [PATCH 04/11] don't save the parsed json Co-authored-by: Chenhui Wang <54903978+wangch079@users.noreply.github.com> --- connectors/sources/google_cloud_storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connectors/sources/google_cloud_storage.py b/connectors/sources/google_cloud_storage.py index 0e1262e04..145715d7b 100644 --- a/connectors/sources/google_cloud_storage.py +++ b/connectors/sources/google_cloud_storage.py @@ -126,7 +126,7 @@ async def validate_config(self): ): raise Exception("Google Cloud service account json can't be empty.") try: - _ = json.loads(self.configuration["service_account_credentials"]) + json.loads(self.configuration["service_account_credentials"]) except ValueError: raise Exception("Google Cloud service account is not a valid JSON.") From 45c804cdaa47dd02f343ee99867eb5d55463cd55 Mon Sep 17 00:00:00 2001 From: jignesh-crest Date: Wed, 15 Mar 2023 18:46:01 +0530 Subject: [PATCH 05/11] move get_pem_format to utils, rename and make initialize as getter method --- connectors/sources/google_cloud_storage.py | 28 ++++++----------- .../tests/test_google_cloud_storage.py | 31 ++----------------- connectors/tests/test_utils.py | 25 +++++++++++++++ connectors/utils.py | 16 ++++++++++ 4 files changed, 52 insertions(+), 48 deletions(-) diff --git a/connectors/sources/google_cloud_storage.py b/connectors/sources/google_cloud_storage.py index 145715d7b..c38d64d41 100644 --- a/connectors/sources/google_cloud_storage.py +++ b/connectors/sources/google_cloud_storage.py @@ -19,7 +19,7 @@ from connectors.logger import logger from connectors.source import BaseDataSource -from connectors.utils import TIKA_SUPPORTED_FILETYPES, convert_to_b64 +from connectors.utils import TIKA_SUPPORTED_FILETYPES, convert_to_b64, get_pem_format CLOUD_STORAGE_READ_ONLY_SCOPE = "https://www.googleapis.com/auth/devstorage.read_only" CLOUD_STORAGE_BASE_URL = "https://console.cloud.google.com/storage/browser/_details/" @@ -201,23 +201,12 @@ async def _api_call( ) await asyncio.sleep(DEFAULT_WAIT_MULTIPLIER**retry_counter) - def get_pem_format(self, private_key, max_split=-1): - """Convert key into PEM format. - - Args: - private_key (str): Private_key in raw format. - max_split (int): Specifies how many splits to do. Defaults to -1. + def get_service_account_credentials(self): + """Initialize and return the ServiceAccountCreds Returns: - string: PEM format + ServiceAccountCreds: An instance of the ServiceAccountCreds. """ - private_key = private_key.replace(" ", "\n") - private_key = " ".join(private_key.split("\n", max_split)) - private_key = " ".join(private_key.rsplit("\n", max_split)) - return private_key - - def _initialize_configurations(self): - """Initialize the ServiceAccountCreds""" if self.service_account_credentials is not None: return @@ -227,8 +216,8 @@ def _initialize_configurations(self): json_credentials.get("private_key") and "\n" not in json_credentials["private_key"] ): - json_credentials["private_key"] = self.get_pem_format( - private_key=json_credentials["private_key"].strip(), + json_credentials["private_key"] = get_pem_format( + key=json_credentials["private_key"].strip(), max_split=2, ) @@ -237,6 +226,7 @@ def _initialize_configurations(self): **json_credentials, ) self.user_project_id = self.service_account_credentials.project_id + return self.service_account_credentials async def ping(self): """Verify the connection with Google Cloud Storage""" @@ -244,7 +234,7 @@ async def ping(self): if RUNNING_FTEST: return - self._initialize_configurations() + self.get_service_account_credentials() try: await anext( self._api_call( @@ -386,7 +376,7 @@ async def get_docs(self, filtering=None): Yields: dictionary: Documents from Google Cloud Storage. """ - self._initialize_configurations() + self.get_service_account_credentials() async for buckets in self.fetch_buckets(): if not buckets.get("items"): continue diff --git a/connectors/sources/tests/test_google_cloud_storage.py b/connectors/sources/tests/test_google_cloud_storage.py index b3df2adde..f8c2c8dd0 100644 --- a/connectors/sources/tests/test_google_cloud_storage.py +++ b/connectors/sources/tests/test_google_cloud_storage.py @@ -32,7 +32,7 @@ def get_mocked_source_object(): {"service_account_credentials": SERVICE_ACCOUNT_CREDENTIALS, "retry_count": 0} ) mocked_gcs_object = GoogleCloudStorageDataSource(configuration=configuration) - mocked_gcs_object._initialize_configurations() + mocked_gcs_object.get_service_account_credentials() return mocked_gcs_object @@ -73,9 +73,7 @@ async def test_empty_configuration(): @pytest.mark.asyncio -async def test_invalid_configuration(): - """Tests that the service account credential is in invalid json format""" - +async def test_raise_on_invalid_configuration(): # Setup configuration = DataSourceConfiguration( {"service_account_credentials": "{'abc':'bcd','cd'}"} @@ -520,28 +518,3 @@ async def test_api_call_for_attribute_error(catch_stdout, patch_logger): userProject=mocked_gcs_object.user_project_id, ): print("Method called successfully....") - - -def test_get_pem_format(): - """This function tests prepare private key with dummy values""" - # Setup - expected_formated_pem_key = """-----BEGIN PRIVATE KEY----- -PrivateKey ------END PRIVATE KEY-----""" - source = get_mocked_source_object() - private_key = "-----BEGIN PRIVATE KEY----- PrivateKey -----END PRIVATE KEY-----" - - # Execute - formated_privat_key = source.get_pem_format(private_key, max_split=2) - assert formated_privat_key == expected_formated_pem_key - - # Setup - expected_formated_certificate = """-----BEGIN CERTIFICATE----- -Certificate1 -Certificate2 ------END CERTIFICATE-----""" - private_key = "-----BEGIN CERTIFICATE----- Certificate1 Certificate2 -----END CERTIFICATE-----" - - # Execute - formated_privat_key = source.get_pem_format(private_key, max_split=1) - assert formated_privat_key == expected_formated_certificate diff --git a/connectors/tests/test_utils.py b/connectors/tests/test_utils.py index 7d3d2f9df..760dd9201 100644 --- a/connectors/tests/test_utils.py +++ b/connectors/tests/test_utils.py @@ -27,6 +27,7 @@ RetryStrategy, convert_to_b64, get_base64_value, + get_pem_format, get_size, next_run, retryable, @@ -336,3 +337,27 @@ async def does_not_raise(): # would fail, if retried once (retry_interval = 5 seconds). Explicit time boundary for this test: 1 second await does_not_raise() + + +def test_get_pem_format(): + """This function tests prepare private key and certificate with dummy values""" + # Setup + expected_formated_pem_key = """-----BEGIN PRIVATE KEY----- +PrivateKey +-----END PRIVATE KEY-----""" + private_key = "-----BEGIN PRIVATE KEY----- PrivateKey -----END PRIVATE KEY-----" + + # Execute + formated_privat_key = get_pem_format(key=private_key, max_split=2) + assert formated_privat_key == expected_formated_pem_key + + # Setup + expected_formated_certificate = """-----BEGIN CERTIFICATE----- +Certificate1 +Certificate2 +-----END CERTIFICATE-----""" + certificate = "-----BEGIN CERTIFICATE----- Certificate1 Certificate2 -----END CERTIFICATE-----" + + # Execute + formated_certificate = get_pem_format(key=certificate, max_split=1) + assert formated_certificate == expected_formated_certificate diff --git a/connectors/utils.py b/connectors/utils.py index 4f4fc49ea..151f6a6be 100644 --- a/connectors/utils.py +++ b/connectors/utils.py @@ -399,3 +399,19 @@ async def func_to_execute(*args, **kwargs): return func_to_execute return wrapper + + +def get_pem_format(key, max_split=-1): + """Convert key into PEM format. + + Args: + key (str): Key in raw format. + max_split (int): Specifies how many splits to do. Defaults to -1. + + Returns: + string: PEM format + """ + key = key.replace(" ", "\n") + key = " ".join(key.split("\n", max_split)) + key = " ".join(key.rsplit("\n", max_split)) + return key From ca65deb91b0f3525eba5438c641ed3cfbd4b51bc Mon Sep 17 00:00:00 2001 From: jignesh-crest Date: Thu, 16 Mar 2023 17:09:28 +0530 Subject: [PATCH 06/11] implement client class for abstracting api calls --- connectors/sources/google_cloud_storage.py | 193 ++++++++++-------- .../tests/test_google_cloud_storage.py | 8 +- 2 files changed, 107 insertions(+), 94 deletions(-) diff --git a/connectors/sources/google_cloud_storage.py b/connectors/sources/google_cloud_storage.py index c38d64d41..c4992c5eb 100644 --- a/connectors/sources/google_cloud_storage.py +++ b/connectors/sources/google_cloud_storage.py @@ -59,76 +59,22 @@ ) -class GoogleCloudStorageDataSource(BaseDataSource): - """Google Cloud Storage""" +class GoogleCloudStorageClient: + """A google client to handle api calls made to Google Cloud Storage.""" - name = "Google Cloud Storage" - service_type = "google_cloud_storage" - - def __init__(self, configuration): - """Set up the connection to the Google Cloud Storage Client. + def __init__(self, retry_count, json_credentials): + """Initialize the ServiceAccountCreds class using which api calls will be made. Args: - configuration (DataSourceConfiguration): Object of DataSourceConfiguration class. - """ - super().__init__(configuration=configuration) - self.retry_count = self.configuration["retry_count"] - self.enable_content_extraction = self.configuration["enable_content_extraction"] - - self.service_account_credentials = None - self.user_project_id = None - - @classmethod - def get_default_configuration(cls): - """Get the default configuration for Google Cloud Storage. - - Returns: - dictionary: Default configuration. - """ - default_credentials = { - "type": "service_account", - "project_id": "dummy_project_id", - "private_key_id": "abc", - "private_key": open(DEFAULT_PEM_FILE).read(), - "client_email": "123-abc@developer.gserviceaccount.com", - "client_id": "123-abc.apps.googleusercontent.com", - "auth_uri": "https://accounts.google.com/o/oauth2/auth", - "token_uri": "http://localhost:443/token", - } - return { - "service_account_credentials": { - "value": json.dumps(default_credentials), - "label": "Google Cloud service account json", - "type": "str", - }, - "retry_count": { - "value": DEFAULT_RETRY_COUNT, - "label": "Maximum retries for failed requests", - "type": "int", - }, - "enable_content_extraction": { - "value": DEFAULT_CONTENT_EXTRACTION, - "label": "Enable content extraction (true/false)", - "type": "bool", - }, - } - - async def validate_config(self): - """Validates whether user inputs are valid or not for configuration field. - - Raises: - Exception: The service account json is empty. - Exception: The format of service account json is invalid. + retry_count (int): Maximum retries for the failed requests. + json_credentials (dict): Service account credentials json. """ - if ( - self.configuration["service_account_credentials"] == "" - or self.configuration["service_account_credentials"] is None - ): - raise Exception("Google Cloud service account json can't be empty.") - try: - json.loads(self.configuration["service_account_credentials"]) - except ValueError: - raise Exception("Google Cloud service account is not a valid JSON.") + self.retry_count = retry_count + self.service_account_credentials = ServiceAccountCreds( + scopes=[CLOUD_STORAGE_READ_ONLY_SCOPE], + **json_credentials, + ) + self.user_project_id = self.service_account_credentials.project_id async def _api_call( self, @@ -138,7 +84,7 @@ async def _api_call( full_response=False, **kwargs, ): - """Method for adding retries whenever exception raised during an api calls + """Make a GET call for Google Cloud Storage with retry for the failed API calls. Args: resource (aiogoogle.resource.Resource): Resource name for which api call will be made. @@ -180,6 +126,7 @@ async def _api_call( ) async for page_items in first_page_with_next_attached: yield page_items + retry_counter = 0 else: if sub_method: method_object = getattr(method_object, sub_method) @@ -201,14 +148,84 @@ async def _api_call( ) await asyncio.sleep(DEFAULT_WAIT_MULTIPLIER**retry_counter) - def get_service_account_credentials(self): - """Initialize and return the ServiceAccountCreds + +class GoogleCloudStorageDataSource(BaseDataSource): + """Google Cloud Storage""" + + name = "Google Cloud Storage" + service_type = "google_cloud_storage" + + def __init__(self, configuration): + """Set up the connection to the Google Cloud Storage Client. + + Args: + configuration (DataSourceConfiguration): Object of DataSourceConfiguration class. + """ + super().__init__(configuration=configuration) + self.enable_content_extraction = self.configuration["enable_content_extraction"] + + self.google_storage_client = None + + @classmethod + def get_default_configuration(cls): + """Get the default configuration for Google Cloud Storage. Returns: - ServiceAccountCreds: An instance of the ServiceAccountCreds. + dictionary: Default configuration. """ - if self.service_account_credentials is not None: - return + default_credentials = { + "type": "service_account", + "project_id": "dummy_project_id", + "private_key_id": "abc", + "private_key": open(DEFAULT_PEM_FILE).read(), + "client_email": "123-abc@developer.gserviceaccount.com", + "client_id": "123-abc.apps.googleusercontent.com", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "http://localhost:443/token", + } + return { + "service_account_credentials": { + "value": json.dumps(default_credentials), + "label": "Google Cloud service account json", + "type": "str", + }, + "retry_count": { + "value": DEFAULT_RETRY_COUNT, + "label": "Maximum retries for failed requests", + "type": "int", + }, + "enable_content_extraction": { + "value": DEFAULT_CONTENT_EXTRACTION, + "label": "Enable content extraction (true/false)", + "type": "bool", + }, + } + + async def validate_config(self): + """Validates whether user inputs are valid or not for configuration field. + + Raises: + Exception: The service account json is empty. + Exception: The format of service account json is invalid. + """ + if ( + self.configuration["service_account_credentials"] == "" + or self.configuration["service_account_credentials"] is None + ): + raise Exception("Google Cloud service account json can't be empty.") + try: + json.loads(self.configuration["service_account_credentials"]) + except ValueError: + raise Exception("Google Cloud service account is not a valid JSON.") + + def get_storage_client(self): + """Initialize and return the GoogleCloudStorageClient + + Returns: + GoogleCloudStorageClient: An instance of the GoogleCloudStorageClient. + """ + if self.google_storage_client is not None: + return self.google_storage_client json_credentials = json.loads(self.configuration["service_account_credentials"]) @@ -221,27 +238,24 @@ def get_service_account_credentials(self): max_split=2, ) - self.service_account_credentials = ServiceAccountCreds( - scopes=[CLOUD_STORAGE_READ_ONLY_SCOPE], - **json_credentials, + self.google_storage_client = GoogleCloudStorageClient( + json_credentials=json_credentials, + retry_count=self.configuration["retry_count"], ) - self.user_project_id = self.service_account_credentials.project_id - return self.service_account_credentials + return self.google_storage_client async def ping(self): """Verify the connection with Google Cloud Storage""" - if RUNNING_FTEST: return - self.get_service_account_credentials() try: await anext( - self._api_call( + self.get_storage_client()._api_call( resource="projects", method="serviceAccount", sub_method="get", - projectId=self.user_project_id, + projectId=self.google_storage_client.user_project_id, ) ) logger.info("Successfully connected to the Google Cloud Storage.") @@ -255,12 +269,12 @@ async def fetch_buckets(self): Yields: Dictionary: Contains the list of fetched buckets from Google Cloud Storage. """ - async for bucket in self._api_call( + async for bucket in self.get_storage_client()._api_call( resource="buckets", method="list", full_response=True, - project=self.user_project_id, - userProject=self.user_project_id, + project=self.google_storage_client.user_project_id, + userProject=self.google_storage_client.user_project_id, ): yield bucket @@ -274,12 +288,12 @@ async def fetch_blobs(self, buckets): Dictionary: Contains the list of fetched blobs from Google Cloud Storage. """ for bucket in buckets.get("items", []): - async for blob in self._api_call( + async for blob in self.google_storage_client._api_call( resource="objects", method="list", full_response=True, bucket=bucket["id"], - userProject=self.user_project_id, + userProject=self.google_storage_client.user_project_id, ): yield blob @@ -298,7 +312,7 @@ def prepare_blob_document(self, blob): blob_name = urllib.parse.quote(blob_document["name"], safe="'") blob_document[ "url" - ] = f"{CLOUD_STORAGE_BASE_URL}{blob_document['bucket_name']}/{blob_name};tab=live_object?project={self.user_project_id}" + ] = f"{CLOUD_STORAGE_BASE_URL}{blob_document['bucket_name']}/{blob_name};tab=live_object?project={self.google_storage_client.user_project_id}" return blob_document def get_blob_document(self, blobs): @@ -346,13 +360,13 @@ async def get_content(self, blob, timestamp=None, doit=None): source_file_name = "" async with NamedTemporaryFile(mode="wb", delete=False) as async_buffer: await anext( - self._api_call( + self.google_storage_client._api_call( resource="objects", method="get", bucket=blob["bucket_name"], object=blob_name, alt="media", - userProject=self.user_project_id, + userProject=self.google_storage_client.user_project_id, pipe_to=async_buffer, ) ) @@ -376,7 +390,6 @@ async def get_docs(self, filtering=None): Yields: dictionary: Documents from Google Cloud Storage. """ - self.get_service_account_credentials() async for buckets in self.fetch_buckets(): if not buckets.get("items"): continue diff --git a/connectors/sources/tests/test_google_cloud_storage.py b/connectors/sources/tests/test_google_cloud_storage.py index f8c2c8dd0..0400b1f4c 100644 --- a/connectors/sources/tests/test_google_cloud_storage.py +++ b/connectors/sources/tests/test_google_cloud_storage.py @@ -32,7 +32,7 @@ def get_mocked_source_object(): {"service_account_credentials": SERVICE_ACCOUNT_CREDENTIALS, "retry_count": 0} ) mocked_gcs_object = GoogleCloudStorageDataSource(configuration=configuration) - mocked_gcs_object.get_service_account_credentials() + mocked_gcs_object.get_storage_client() return mocked_gcs_object @@ -399,7 +399,7 @@ async def test_get_content(): Aiogoogle, "as_service_account", return_value=blob_content_response ): google_client = Aiogoogle( - service_account_creds=mocked_gcs_object.service_account_credentials + service_account_creds=mocked_gcs_object.google_storage_client.service_account_credentials ) storage_client = await google_client.discover( api_name=API_NAME, api_version=API_VERSION @@ -438,7 +438,7 @@ async def test_get_content_when_type_not_supported(): # Execute and Assert google_client = Aiogoogle( - service_account_creds=mocked_gcs_object.service_account_credentials + service_account_creds=mocked_gcs_object.google_storage_client.service_account_credentials ) storage_client = await google_client.discover( api_name=API_NAME, api_version=API_VERSION @@ -482,7 +482,7 @@ async def test_get_content_when_file_size_is_large(catch_stdout, patch_logger): # Execute and Assert google_client = Aiogoogle( - service_account_creds=mocked_gcs_object.service_account_credentials + service_account_creds=mocked_gcs_object.google_storage_client.service_account_credentials ) storage_client = await google_client.discover( api_name=API_NAME, api_version=API_VERSION From fb25b6ee8c21a1c0f149048a4b71b9e062407239 Mon Sep 17 00:00:00 2001 From: jignesh-crest <95845553+jignesh-crest@users.noreply.github.com> Date: Thu, 16 Mar 2023 18:54:09 +0530 Subject: [PATCH 07/11] Use the getter instead of relying on the initialization of the storage client. Co-authored-by: Tim Grein --- connectors/sources/google_cloud_storage.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/connectors/sources/google_cloud_storage.py b/connectors/sources/google_cloud_storage.py index c4992c5eb..fbde7499a 100644 --- a/connectors/sources/google_cloud_storage.py +++ b/connectors/sources/google_cloud_storage.py @@ -255,7 +255,7 @@ async def ping(self): resource="projects", method="serviceAccount", sub_method="get", - projectId=self.google_storage_client.user_project_id, + projectId=self.get_storage_client().user_project_id, ) ) logger.info("Successfully connected to the Google Cloud Storage.") @@ -288,7 +288,7 @@ async def fetch_blobs(self, buckets): Dictionary: Contains the list of fetched blobs from Google Cloud Storage. """ for bucket in buckets.get("items", []): - async for blob in self.google_storage_client._api_call( + async for blob in self.get_storage_client()._api_call( resource="objects", method="list", full_response=True, @@ -312,7 +312,7 @@ def prepare_blob_document(self, blob): blob_name = urllib.parse.quote(blob_document["name"], safe="'") blob_document[ "url" - ] = f"{CLOUD_STORAGE_BASE_URL}{blob_document['bucket_name']}/{blob_name};tab=live_object?project={self.google_storage_client.user_project_id}" + ] = f"{CLOUD_STORAGE_BASE_URL}{blob_document['bucket_name']}/{blob_name};tab=live_object?project={self.get_storage_client().user_project_id}" return blob_document def get_blob_document(self, blobs): @@ -360,7 +360,7 @@ async def get_content(self, blob, timestamp=None, doit=None): source_file_name = "" async with NamedTemporaryFile(mode="wb", delete=False) as async_buffer: await anext( - self.google_storage_client._api_call( + self.get_storage_client()._api_call( resource="objects", method="get", bucket=blob["bucket_name"], From d59fa1fd1ae36386c773f7e4551fd87c3ed998e4 Mon Sep 17 00:00:00 2001 From: jignesh-crest Date: Fri, 17 Mar 2023 13:06:09 +0530 Subject: [PATCH 08/11] add tests for the get_client and use getter for google client --- connectors/sources/google_cloud_storage.py | 8 +++---- .../tests/test_google_cloud_storage.py | 22 +++++++++++++------ 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/connectors/sources/google_cloud_storage.py b/connectors/sources/google_cloud_storage.py index fbde7499a..800381479 100644 --- a/connectors/sources/google_cloud_storage.py +++ b/connectors/sources/google_cloud_storage.py @@ -273,8 +273,8 @@ async def fetch_buckets(self): resource="buckets", method="list", full_response=True, - project=self.google_storage_client.user_project_id, - userProject=self.google_storage_client.user_project_id, + project=self.get_storage_client().user_project_id, + userProject=self.get_storage_client().user_project_id, ): yield bucket @@ -293,7 +293,7 @@ async def fetch_blobs(self, buckets): method="list", full_response=True, bucket=bucket["id"], - userProject=self.google_storage_client.user_project_id, + userProject=self.get_storage_client().user_project_id, ): yield blob @@ -366,7 +366,7 @@ async def get_content(self, blob, timestamp=None, doit=None): bucket=blob["bucket_name"], object=blob_name, alt="media", - userProject=self.google_storage_client.user_project_id, + userProject=self.get_storage_client().user_project_id, pipe_to=async_buffer, ) ) diff --git a/connectors/sources/tests/test_google_cloud_storage.py b/connectors/sources/tests/test_google_cloud_storage.py index 0400b1f4c..eb9668e9a 100644 --- a/connectors/sources/tests/test_google_cloud_storage.py +++ b/connectors/sources/tests/test_google_cloud_storage.py @@ -32,7 +32,6 @@ def get_mocked_source_object(): {"service_account_credentials": SERVICE_ACCOUNT_CREDENTIALS, "retry_count": 0} ) mocked_gcs_object = GoogleCloudStorageDataSource(configuration=configuration) - mocked_gcs_object.get_storage_client() return mocked_gcs_object @@ -399,7 +398,7 @@ async def test_get_content(): Aiogoogle, "as_service_account", return_value=blob_content_response ): google_client = Aiogoogle( - service_account_creds=mocked_gcs_object.google_storage_client.service_account_credentials + service_account_creds=mocked_gcs_object.get_storage_client().service_account_credentials ) storage_client = await google_client.discover( api_name=API_NAME, api_version=API_VERSION @@ -438,7 +437,7 @@ async def test_get_content_when_type_not_supported(): # Execute and Assert google_client = Aiogoogle( - service_account_creds=mocked_gcs_object.google_storage_client.service_account_credentials + service_account_creds=mocked_gcs_object.get_storage_client().service_account_credentials ) storage_client = await google_client.discover( api_name=API_NAME, api_version=API_VERSION @@ -482,7 +481,7 @@ async def test_get_content_when_file_size_is_large(catch_stdout, patch_logger): # Execute and Assert google_client = Aiogoogle( - service_account_creds=mocked_gcs_object.google_storage_client.service_account_credentials + service_account_creds=mocked_gcs_object.get_storage_client().service_account_credentials ) storage_client = await google_client.discover( api_name=API_NAME, api_version=API_VERSION @@ -510,11 +509,20 @@ async def test_api_call_for_attribute_error(catch_stdout, patch_logger): # Execute with pytest.raises(AttributeError): - async for _ in mocked_gcs_object._api_call( + async for _ in mocked_gcs_object.get_storage_client()._api_call( resource="buckets_dummy", method="list", full_response=True, - project=mocked_gcs_object.user_project_id, - userProject=mocked_gcs_object.user_project_id, + project=mocked_gcs_object.get_storage_client().user_project_id, + userProject=mocked_gcs_object.get_storage_client().user_project_id, ): print("Method called successfully....") + + +def test_get_storage_client(): + """Test that the instance returned is always the same for the same datasource class.""" + mocked_gcs_object = get_mocked_source_object() + first_instance = mocked_gcs_object.get_storage_client() + second_instance = mocked_gcs_object.get_storage_client() + + assert first_instance == second_instance From 5dffd4611c1df7ce2b6b81ea1d7c0191323cc827 Mon Sep 17 00:00:00 2001 From: jignesh-crest Date: Fri, 17 Mar 2023 15:41:09 +0530 Subject: [PATCH 09/11] update test --- connectors/sources/tests/test_google_cloud_storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connectors/sources/tests/test_google_cloud_storage.py b/connectors/sources/tests/test_google_cloud_storage.py index eb9668e9a..04fb0051b 100644 --- a/connectors/sources/tests/test_google_cloud_storage.py +++ b/connectors/sources/tests/test_google_cloud_storage.py @@ -525,4 +525,4 @@ def test_get_storage_client(): first_instance = mocked_gcs_object.get_storage_client() second_instance = mocked_gcs_object.get_storage_client() - assert first_instance == second_instance + assert first_instance is second_instance From ea031baa37688e2b1ed8f8821dd39a1b59706d26 Mon Sep 17 00:00:00 2001 From: jignesh-crest Date: Mon, 20 Mar 2023 15:05:05 +0530 Subject: [PATCH 10/11] Expose api_call method as public method, reame method --- connectors/sources/google_cloud_storage.py | 10 +++---- .../tests/test_google_cloud_storage.py | 30 +++++++++---------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/connectors/sources/google_cloud_storage.py b/connectors/sources/google_cloud_storage.py index 800381479..329b3f312 100644 --- a/connectors/sources/google_cloud_storage.py +++ b/connectors/sources/google_cloud_storage.py @@ -76,7 +76,7 @@ def __init__(self, retry_count, json_credentials): ) self.user_project_id = self.service_account_credentials.project_id - async def _api_call( + async def api_call( self, resource, method, @@ -251,7 +251,7 @@ async def ping(self): try: await anext( - self.get_storage_client()._api_call( + self.get_storage_client().api_call( resource="projects", method="serviceAccount", sub_method="get", @@ -269,7 +269,7 @@ async def fetch_buckets(self): Yields: Dictionary: Contains the list of fetched buckets from Google Cloud Storage. """ - async for bucket in self.get_storage_client()._api_call( + async for bucket in self.get_storage_client().api_call( resource="buckets", method="list", full_response=True, @@ -288,7 +288,7 @@ async def fetch_blobs(self, buckets): Dictionary: Contains the list of fetched blobs from Google Cloud Storage. """ for bucket in buckets.get("items", []): - async for blob in self.get_storage_client()._api_call( + async for blob in self.get_storage_client().api_call( resource="objects", method="list", full_response=True, @@ -360,7 +360,7 @@ async def get_content(self, blob, timestamp=None, doit=None): source_file_name = "" async with NamedTemporaryFile(mode="wb", delete=False) as async_buffer: await anext( - self.get_storage_client()._api_call( + self.get_storage_client().api_call( resource="objects", method="get", bucket=blob["bucket_name"], diff --git a/connectors/sources/tests/test_google_cloud_storage.py b/connectors/sources/tests/test_google_cloud_storage.py index 04fb0051b..31b5702b0 100644 --- a/connectors/sources/tests/test_google_cloud_storage.py +++ b/connectors/sources/tests/test_google_cloud_storage.py @@ -22,7 +22,7 @@ API_VERSION = "v1" -def get_mocked_source_object(): +def get_gcs_source_object(): """Creates the mocked Google cloud storage object. Returns: @@ -96,7 +96,7 @@ async def test_ping_for_successful_connection(catch_stdout, patch_logger): "kind": "storage#serviceAccount", "email_address": "serviceaccount@email.com", } - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() as_service_account_response = asyncio.Future() as_service_account_response.set_result(expected_response) @@ -114,7 +114,7 @@ async def test_ping_for_failed_connection(catch_stdout, patch_logger): # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() # Execute with mock.patch.object( @@ -173,7 +173,7 @@ def test_get_blob_document(previous_documents_list, updated_documents_list): """ # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() # Execute and Assert assert updated_documents_list == list( @@ -186,7 +186,7 @@ async def test_fetch_buckets(): """Tests the method which lists the storage buckets available in Google Cloud Storage.""" # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() expected_response = { "kind": "storage#objects", "items": [ @@ -238,7 +238,7 @@ async def test_fetch_blobs(): """Tests the method responsible to yield blobs from Google Cloud Storage bucket.""" # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() expected_bucket_response = { "kind": "storage#objects", "items": [ @@ -287,7 +287,7 @@ async def test_get_docs(): """Tests the module responsible to fetch and yield blobs documents from Google Cloud Storage.""" # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() expected_response = { "kind": "storage#objects", "items": [ @@ -341,7 +341,7 @@ async def test_get_docs_when_no_buckets_present(): """ # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() expected_response = { "kind": "storage#objects", } @@ -368,7 +368,7 @@ async def test_get_content(): """Test the module responsible for fetching the content of the file if it is extractable.""" # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() blob_document = { "id": "bucket_1/blob_1/123123123", "component_count": None, @@ -416,7 +416,7 @@ async def test_get_content_when_type_not_supported(): """Test the module responsible for fetching the content of the file if it is not extractable or doit is not true.""" # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() blob_document = { "_id": "bucket_1/blob_1/123123123", "component_count": None, @@ -460,7 +460,7 @@ async def test_get_content_when_file_size_is_large(catch_stdout, patch_logger): """Test the module responsible for fetching the content of the file if it is not extractable or doit is not true.""" # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() blob_document = { "_id": "bucket_1/blob_1/123123123", "component_count": None, @@ -501,15 +501,15 @@ async def test_get_content_when_file_size_is_large(catch_stdout, patch_logger): @pytest.mark.asyncio async def test_api_call_for_attribute_error(catch_stdout, patch_logger): - """Tests the _api_call method when resource attribute is not present in the getattr.""" + """Tests the api_call method when resource attribute is not present in the getattr.""" # Setup - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() # Execute with pytest.raises(AttributeError): - async for _ in mocked_gcs_object.get_storage_client()._api_call( + async for _ in mocked_gcs_object.get_storage_client().api_call( resource="buckets_dummy", method="list", full_response=True, @@ -521,7 +521,7 @@ async def test_api_call_for_attribute_error(catch_stdout, patch_logger): def test_get_storage_client(): """Test that the instance returned is always the same for the same datasource class.""" - mocked_gcs_object = get_mocked_source_object() + mocked_gcs_object = get_gcs_source_object() first_instance = mocked_gcs_object.get_storage_client() second_instance = mocked_gcs_object.get_storage_client() From 4a0b747df0cbb6c2f74ad3f81399a4425959f63a Mon Sep 17 00:00:00 2001 From: jignesh-crest Date: Tue, 21 Mar 2023 16:20:27 +0530 Subject: [PATCH 11/11] cached property for google client, make it private --- connectors/sources/google_cloud_storage.py | 33 ++++++++----------- .../tests/test_google_cloud_storage.py | 21 ++++-------- 2 files changed, 20 insertions(+), 34 deletions(-) diff --git a/connectors/sources/google_cloud_storage.py b/connectors/sources/google_cloud_storage.py index 329b3f312..b9f486ef3 100644 --- a/connectors/sources/google_cloud_storage.py +++ b/connectors/sources/google_cloud_storage.py @@ -9,7 +9,7 @@ import json import os import urllib.parse -from functools import partial +from functools import cached_property, partial import aiofiles from aiofiles.os import remove @@ -164,8 +164,6 @@ def __init__(self, configuration): super().__init__(configuration=configuration) self.enable_content_extraction = self.configuration["enable_content_extraction"] - self.google_storage_client = None - @classmethod def get_default_configuration(cls): """Get the default configuration for Google Cloud Storage. @@ -218,15 +216,13 @@ async def validate_config(self): except ValueError: raise Exception("Google Cloud service account is not a valid JSON.") - def get_storage_client(self): + @cached_property + def _google_storage_client(self): """Initialize and return the GoogleCloudStorageClient Returns: GoogleCloudStorageClient: An instance of the GoogleCloudStorageClient. """ - if self.google_storage_client is not None: - return self.google_storage_client - json_credentials = json.loads(self.configuration["service_account_credentials"]) if ( @@ -238,11 +234,10 @@ def get_storage_client(self): max_split=2, ) - self.google_storage_client = GoogleCloudStorageClient( + return GoogleCloudStorageClient( json_credentials=json_credentials, retry_count=self.configuration["retry_count"], ) - return self.google_storage_client async def ping(self): """Verify the connection with Google Cloud Storage""" @@ -251,11 +246,11 @@ async def ping(self): try: await anext( - self.get_storage_client().api_call( + self._google_storage_client.api_call( resource="projects", method="serviceAccount", sub_method="get", - projectId=self.get_storage_client().user_project_id, + projectId=self._google_storage_client.user_project_id, ) ) logger.info("Successfully connected to the Google Cloud Storage.") @@ -269,12 +264,12 @@ async def fetch_buckets(self): Yields: Dictionary: Contains the list of fetched buckets from Google Cloud Storage. """ - async for bucket in self.get_storage_client().api_call( + async for bucket in self._google_storage_client.api_call( resource="buckets", method="list", full_response=True, - project=self.get_storage_client().user_project_id, - userProject=self.get_storage_client().user_project_id, + project=self._google_storage_client.user_project_id, + userProject=self._google_storage_client.user_project_id, ): yield bucket @@ -288,12 +283,12 @@ async def fetch_blobs(self, buckets): Dictionary: Contains the list of fetched blobs from Google Cloud Storage. """ for bucket in buckets.get("items", []): - async for blob in self.get_storage_client().api_call( + async for blob in self._google_storage_client.api_call( resource="objects", method="list", full_response=True, bucket=bucket["id"], - userProject=self.get_storage_client().user_project_id, + userProject=self._google_storage_client.user_project_id, ): yield blob @@ -312,7 +307,7 @@ def prepare_blob_document(self, blob): blob_name = urllib.parse.quote(blob_document["name"], safe="'") blob_document[ "url" - ] = f"{CLOUD_STORAGE_BASE_URL}{blob_document['bucket_name']}/{blob_name};tab=live_object?project={self.get_storage_client().user_project_id}" + ] = f"{CLOUD_STORAGE_BASE_URL}{blob_document['bucket_name']}/{blob_name};tab=live_object?project={self._google_storage_client.user_project_id}" return blob_document def get_blob_document(self, blobs): @@ -360,13 +355,13 @@ async def get_content(self, blob, timestamp=None, doit=None): source_file_name = "" async with NamedTemporaryFile(mode="wb", delete=False) as async_buffer: await anext( - self.get_storage_client().api_call( + self._google_storage_client.api_call( resource="objects", method="get", bucket=blob["bucket_name"], object=blob_name, alt="media", - userProject=self.get_storage_client().user_project_id, + userProject=self._google_storage_client.user_project_id, pipe_to=async_buffer, ) ) diff --git a/connectors/sources/tests/test_google_cloud_storage.py b/connectors/sources/tests/test_google_cloud_storage.py index 31b5702b0..c566d6130 100644 --- a/connectors/sources/tests/test_google_cloud_storage.py +++ b/connectors/sources/tests/test_google_cloud_storage.py @@ -398,7 +398,7 @@ async def test_get_content(): Aiogoogle, "as_service_account", return_value=blob_content_response ): google_client = Aiogoogle( - service_account_creds=mocked_gcs_object.get_storage_client().service_account_credentials + service_account_creds=mocked_gcs_object._google_storage_client.service_account_credentials ) storage_client = await google_client.discover( api_name=API_NAME, api_version=API_VERSION @@ -437,7 +437,7 @@ async def test_get_content_when_type_not_supported(): # Execute and Assert google_client = Aiogoogle( - service_account_creds=mocked_gcs_object.get_storage_client().service_account_credentials + service_account_creds=mocked_gcs_object._google_storage_client.service_account_credentials ) storage_client = await google_client.discover( api_name=API_NAME, api_version=API_VERSION @@ -481,7 +481,7 @@ async def test_get_content_when_file_size_is_large(catch_stdout, patch_logger): # Execute and Assert google_client = Aiogoogle( - service_account_creds=mocked_gcs_object.get_storage_client().service_account_credentials + service_account_creds=mocked_gcs_object._google_storage_client.service_account_credentials ) storage_client = await google_client.discover( api_name=API_NAME, api_version=API_VERSION @@ -509,20 +509,11 @@ async def test_api_call_for_attribute_error(catch_stdout, patch_logger): # Execute with pytest.raises(AttributeError): - async for _ in mocked_gcs_object.get_storage_client().api_call( + async for _ in mocked_gcs_object._google_storage_client.api_call( resource="buckets_dummy", method="list", full_response=True, - project=mocked_gcs_object.get_storage_client().user_project_id, - userProject=mocked_gcs_object.get_storage_client().user_project_id, + project=mocked_gcs_object._google_storage_client.user_project_id, + userProject=mocked_gcs_object._google_storage_client.user_project_id, ): print("Method called successfully....") - - -def test_get_storage_client(): - """Test that the instance returned is always the same for the same datasource class.""" - mocked_gcs_object = get_gcs_source_object() - first_instance = mocked_gcs_object.get_storage_client() - second_instance = mocked_gcs_object.get_storage_client() - - assert first_instance is second_instance