Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GCS]-Parse configurations from Kibana UI #493

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d2aab73
parse configs from ui by formating the private-key
jignesh-crest Feb 16, 2023
bcc60b1
Add custom exception, remove unwanted shilds
jignesh-crest Feb 17, 2023
18b97e6
Merge branch 'main' into handle-configuration-from-ui
jignesh-crest Feb 20, 2023
47d0418
Merge branch 'main' of github.com:jignesh-crest/connectors-python int…
jignesh-crest Mar 2, 2023
1238165
use validate config from basesource
jignesh-crest Mar 2, 2023
fa2acc6
don't save the parsed json
jignesh-crest Mar 2, 2023
228a698
Merge branch 'main' into handle-configuration-from-ui
jignesh-crest Mar 10, 2023
598d2e6
Merge branch 'main' of github.com:jignesh-crest/connectors-python int…
jignesh-crest Mar 15, 2023
45c804c
move get_pem_format to utils, rename and make initialize as getter me…
jignesh-crest Mar 15, 2023
7f3df56
Merge branch 'main' of github.com:jignesh-crest/connectors-python int…
jignesh-crest Mar 16, 2023
ca65deb
implement client class for abstracting api calls
jignesh-crest Mar 16, 2023
fb25b6e
Use the getter instead of relying on the initialization of the storag…
jignesh-crest Mar 16, 2023
d59fa1f
add tests for the get_client and use getter for google client
jignesh-crest Mar 17, 2023
5dffd46
update test
jignesh-crest Mar 17, 2023
4c7e88b
Merge branch 'main' into handle-configuration-from-ui
jignesh-crest Mar 20, 2023
ea031ba
Expose api_call method as public method, reame method
jignesh-crest Mar 20, 2023
31c74f1
Merge branch 'main' into handle-configuration-from-ui
jignesh-crest Mar 21, 2023
4a0b747
cached property for google client, make it private
jignesh-crest Mar 21, 2023
0f1ad0e
Merge branch 'main' into handle-configuration-from-ui
jignesh-crest Mar 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 119 additions & 73 deletions connectors/sources/google_cloud_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from connectors.logger import logger
from connectors.source import BaseDataSource
from connectors.utils import TIKA_SUPPORTED_FILETYPES, convert_to_b64
from connectors.utils import TIKA_SUPPORTED_FILETYPES, convert_to_b64, get_pem_format

CLOUD_STORAGE_READ_ONLY_SCOPE = "https://www.googleapis.com/auth/devstorage.read_only"
CLOUD_STORAGE_BASE_URL = "https://console.cloud.google.com/storage/browser/_details/"
Expand Down Expand Up @@ -59,84 +59,32 @@
)


class GoogleCloudStorageDataSource(BaseDataSource):
"""Google Cloud Storage"""

name = "Google Cloud Storage"
service_type = "google_cloud_storage"
class GoogleCloudStorageClient:
"""A google client to handle api calls made to Google Cloud Storage."""

def __init__(self, configuration):
"""Set up the connection to the Google Cloud Storage Client.
def __init__(self, retry_count, json_credentials):
"""Initialize the ServiceAccountCreds class using which api calls will be made.

Args:
configuration (DataSourceConfiguration): Object of DataSourceConfiguration class.
retry_count (int): Maximum retries for the failed requests.
json_credentials (dict): Service account credentials json.
"""
super().__init__(configuration=configuration)
if not self.configuration["service_account_credentials"]:
raise Exception("service_account_credentials can't be empty.")

self.credentials = (
self.configuration["service_account_credentials"]
.strip()
.encode("unicode_escape")
.decode()
)
self.retry_count = retry_count
self.service_account_credentials = ServiceAccountCreds(
scopes=[CLOUD_STORAGE_READ_ONLY_SCOPE],
**json.loads(
self.configuration["service_account_credentials"]
if RUNNING_FTEST
else self.credentials
),
**json_credentials,
)
self.user_project_id = self.service_account_credentials.project_id
self.retry_count = self.configuration["retry_count"]
self.enable_content_extraction = self.configuration["enable_content_extraction"]

@classmethod
def get_default_configuration(cls):
"""Get the default configuration for Google Cloud Storage.

Returns:
dictionary: Default configuration.
"""
default_credentials = {
"type": "service_account",
"project_id": "dummy_project_id",
"private_key_id": "abc",
"private_key": open(DEFAULT_PEM_FILE).read(),
"client_email": "123-abc@developer.gserviceaccount.com",
"client_id": "123-abc.apps.googleusercontent.com",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "http://localhost:443/token",
}
return {
"service_account_credentials": {
"value": json.dumps(default_credentials),
"label": "JSON string for Google Cloud service account",
"type": "str",
},
"retry_count": {
"value": DEFAULT_RETRY_COUNT,
"label": "Maximum retries for failed requests",
"type": "int",
},
"enable_content_extraction": {
"value": DEFAULT_CONTENT_EXTRACTION,
"label": "Enable content extraction (true/false)",
"type": "bool",
},
}

async def _api_call(
async def api_call(
self,
resource,
method,
sub_method=None,
full_response=False,
**kwargs,
):
"""Method for adding retries whenever exception raised during an api calls
"""Make a GET call for Google Cloud Storage with retry for the failed API calls.

Args:
resource (aiogoogle.resource.Resource): Resource name for which api call will be made.
Expand Down Expand Up @@ -178,6 +126,7 @@ async def _api_call(
)
async for page_items in first_page_with_next_attached:
yield page_items
retry_counter = 0
else:
if sub_method:
method_object = getattr(method_object, sub_method)
Expand All @@ -199,17 +148,114 @@ async def _api_call(
)
await asyncio.sleep(DEFAULT_WAIT_MULTIPLIER**retry_counter)


class GoogleCloudStorageDataSource(BaseDataSource):
"""Google Cloud Storage"""

name = "Google Cloud Storage"
service_type = "google_cloud_storage"

def __init__(self, configuration):
"""Set up the connection to the Google Cloud Storage Client.

Args:
configuration (DataSourceConfiguration): Object of DataSourceConfiguration class.
"""
super().__init__(configuration=configuration)
self.enable_content_extraction = self.configuration["enable_content_extraction"]

self.google_storage_client = None
jignesh-crest marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def get_default_configuration(cls):
"""Get the default configuration for Google Cloud Storage.

Returns:
dictionary: Default configuration.
"""
default_credentials = {
"type": "service_account",
"project_id": "dummy_project_id",
"private_key_id": "abc",
"private_key": open(DEFAULT_PEM_FILE).read(),
"client_email": "123-abc@developer.gserviceaccount.com",
"client_id": "123-abc.apps.googleusercontent.com",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "http://localhost:443/token",
}
return {
"service_account_credentials": {
"value": json.dumps(default_credentials),
"label": "Google Cloud service account json",
"type": "str",
},
"retry_count": {
"value": DEFAULT_RETRY_COUNT,
"label": "Maximum retries for failed requests",
"type": "int",
},
"enable_content_extraction": {
"value": DEFAULT_CONTENT_EXTRACTION,
"label": "Enable content extraction (true/false)",
"type": "bool",
},
}

async def validate_config(self):
"""Validates whether user inputs are valid or not for configuration field.

Raises:
Exception: The service account json is empty.
Exception: The format of service account json is invalid.
"""
if (
self.configuration["service_account_credentials"] == ""
or self.configuration["service_account_credentials"] is None
):
raise Exception("Google Cloud service account json can't be empty.")
try:
json.loads(self.configuration["service_account_credentials"])
except ValueError:
raise Exception("Google Cloud service account is not a valid JSON.")

def get_storage_client(self):
jignesh-crest marked this conversation as resolved.
Show resolved Hide resolved
"""Initialize and return the GoogleCloudStorageClient

Returns:
GoogleCloudStorageClient: An instance of the GoogleCloudStorageClient.
"""
if self.google_storage_client is not None:
return self.google_storage_client

json_credentials = json.loads(self.configuration["service_account_credentials"])

if (
json_credentials.get("private_key")
and "\n" not in json_credentials["private_key"]
jignesh-crest marked this conversation as resolved.
Show resolved Hide resolved
):
json_credentials["private_key"] = get_pem_format(
key=json_credentials["private_key"].strip(),
max_split=2,
)

self.google_storage_client = GoogleCloudStorageClient(
json_credentials=json_credentials,
retry_count=self.configuration["retry_count"],
)
return self.google_storage_client

async def ping(self):
"""Verify the connection with Google Cloud Storage"""
if RUNNING_FTEST:
jignesh-crest marked this conversation as resolved.
Show resolved Hide resolved
return

try:
await anext(
self._api_call(
self.get_storage_client().api_call(
resource="projects",
method="serviceAccount",
sub_method="get",
projectId=self.user_project_id,
projectId=self.get_storage_client().user_project_id,
)
)
logger.info("Successfully connected to the Google Cloud Storage.")
Expand All @@ -223,12 +269,12 @@ async def fetch_buckets(self):
Yields:
Dictionary: Contains the list of fetched buckets from Google Cloud Storage.
"""
async for bucket in self._api_call(
async for bucket in self.get_storage_client().api_call(
resource="buckets",
method="list",
full_response=True,
project=self.user_project_id,
userProject=self.user_project_id,
project=self.get_storage_client().user_project_id,
userProject=self.get_storage_client().user_project_id,
):
yield bucket

Expand All @@ -242,12 +288,12 @@ async def fetch_blobs(self, buckets):
Dictionary: Contains the list of fetched blobs from Google Cloud Storage.
"""
for bucket in buckets.get("items", []):
async for blob in self._api_call(
async for blob in self.get_storage_client().api_call(
resource="objects",
method="list",
full_response=True,
bucket=bucket["id"],
userProject=self.user_project_id,
userProject=self.get_storage_client().user_project_id,
):
yield blob

Expand All @@ -266,7 +312,7 @@ def prepare_blob_document(self, blob):
blob_name = urllib.parse.quote(blob_document["name"], safe="'")
blob_document[
"url"
] = f"{CLOUD_STORAGE_BASE_URL}{blob_document['bucket_name']}/{blob_name};tab=live_object?project={self.user_project_id}"
] = f"{CLOUD_STORAGE_BASE_URL}{blob_document['bucket_name']}/{blob_name};tab=live_object?project={self.get_storage_client().user_project_id}"
return blob_document

def get_blob_document(self, blobs):
Expand Down Expand Up @@ -314,13 +360,13 @@ async def get_content(self, blob, timestamp=None, doit=None):
source_file_name = ""
async with NamedTemporaryFile(mode="wb", delete=False) as async_buffer:
await anext(
self._api_call(
self.get_storage_client().api_call(
resource="objects",
method="get",
bucket=blob["bucket_name"],
object=blob_name,
alt="media",
userProject=self.user_project_id,
userProject=self.get_storage_client().user_project_id,
pipe_to=async_buffer,
)
)
Expand Down
Loading