Skip to content

Commit

Permalink
[GCS]-Parse configurations from Kibana UI (#493)
Browse files Browse the repository at this point in the history
(cherry picked from commit 2144c0f)
  • Loading branch information
jignesh-crest committed Mar 21, 2023
1 parent f41c8a1 commit c704f86
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 95 deletions.
189 changes: 115 additions & 74 deletions connectors/sources/google_cloud_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import json
import os
import urllib.parse
from functools import partial
from functools import cached_property, partial

import aiofiles
from aiofiles.os import remove
Expand All @@ -19,7 +19,7 @@

from connectors.logger import logger
from connectors.source import BaseDataSource
from connectors.utils import TIKA_SUPPORTED_FILETYPES, convert_to_b64
from connectors.utils import TIKA_SUPPORTED_FILETYPES, convert_to_b64, get_pem_format

CLOUD_STORAGE_READ_ONLY_SCOPE = "https://www.googleapis.com/auth/devstorage.read_only"
CLOUD_STORAGE_BASE_URL = "https://console.cloud.google.com/storage/browser/_details/"
Expand Down Expand Up @@ -59,84 +59,32 @@
)


class GoogleCloudStorageDataSource(BaseDataSource):
"""Google Cloud Storage"""
class GoogleCloudStorageClient:
"""A google client to handle api calls made to Google Cloud Storage."""

name = "Google Cloud Storage"
service_type = "google_cloud_storage"

def __init__(self, configuration):
"""Set up the connection to the Google Cloud Storage Client.
def __init__(self, retry_count, json_credentials):
"""Initialize the ServiceAccountCreds class using which api calls will be made.
Args:
configuration (DataSourceConfiguration): Object of DataSourceConfiguration class.
retry_count (int): Maximum retries for the failed requests.
json_credentials (dict): Service account credentials json.
"""
super().__init__(configuration=configuration)
if not self.configuration["service_account_credentials"]:
raise Exception("service_account_credentials can't be empty.")

self.credentials = (
self.configuration["service_account_credentials"]
.strip()
.encode("unicode_escape")
.decode()
)
self.retry_count = retry_count
self.service_account_credentials = ServiceAccountCreds(
scopes=[CLOUD_STORAGE_READ_ONLY_SCOPE],
**json.loads(
self.configuration["service_account_credentials"]
if RUNNING_FTEST
else self.credentials
),
**json_credentials,
)
self.user_project_id = self.service_account_credentials.project_id
self.retry_count = self.configuration["retry_count"]
self.enable_content_extraction = self.configuration["enable_content_extraction"]

@classmethod
def get_default_configuration(cls):
"""Get the default configuration for Google Cloud Storage.
Returns:
dictionary: Default configuration.
"""
default_credentials = {
"type": "service_account",
"project_id": "dummy_project_id",
"private_key_id": "abc",
"private_key": open(DEFAULT_PEM_FILE).read(),
"client_email": "123-abc@developer.gserviceaccount.com",
"client_id": "123-abc.apps.googleusercontent.com",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "http://localhost:443/token",
}
return {
"service_account_credentials": {
"value": json.dumps(default_credentials),
"label": "JSON string for Google Cloud service account",
"type": "str",
},
"retry_count": {
"value": DEFAULT_RETRY_COUNT,
"label": "Maximum retries for failed requests",
"type": "int",
},
"enable_content_extraction": {
"value": DEFAULT_CONTENT_EXTRACTION,
"label": "Enable content extraction (true/false)",
"type": "bool",
},
}

async def _api_call(
async def api_call(
self,
resource,
method,
sub_method=None,
full_response=False,
**kwargs,
):
"""Method for adding retries whenever exception raised during an api calls
"""Make a GET call for Google Cloud Storage with retry for the failed API calls.
Args:
resource (aiogoogle.resource.Resource): Resource name for which api call will be made.
Expand Down Expand Up @@ -178,6 +126,7 @@ async def _api_call(
)
async for page_items in first_page_with_next_attached:
yield page_items
retry_counter = 0
else:
if sub_method:
method_object = getattr(method_object, sub_method)
Expand All @@ -199,17 +148,109 @@ async def _api_call(
)
await asyncio.sleep(DEFAULT_WAIT_MULTIPLIER**retry_counter)


class GoogleCloudStorageDataSource(BaseDataSource):
"""Google Cloud Storage"""

name = "Google Cloud Storage"
service_type = "google_cloud_storage"

def __init__(self, configuration):
"""Set up the connection to the Google Cloud Storage Client.
Args:
configuration (DataSourceConfiguration): Object of DataSourceConfiguration class.
"""
super().__init__(configuration=configuration)
self.enable_content_extraction = self.configuration["enable_content_extraction"]

@classmethod
def get_default_configuration(cls):
"""Get the default configuration for Google Cloud Storage.
Returns:
dictionary: Default configuration.
"""
default_credentials = {
"type": "service_account",
"project_id": "dummy_project_id",
"private_key_id": "abc",
"private_key": open(DEFAULT_PEM_FILE).read(),
"client_email": "123-abc@developer.gserviceaccount.com",
"client_id": "123-abc.apps.googleusercontent.com",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "http://localhost:443/token",
}
return {
"service_account_credentials": {
"value": json.dumps(default_credentials),
"label": "Google Cloud service account json",
"type": "str",
},
"retry_count": {
"value": DEFAULT_RETRY_COUNT,
"label": "Maximum retries for failed requests",
"type": "int",
},
"enable_content_extraction": {
"value": DEFAULT_CONTENT_EXTRACTION,
"label": "Enable content extraction (true/false)",
"type": "bool",
},
}

async def validate_config(self):
"""Validates whether user inputs are valid or not for configuration field.
Raises:
Exception: The service account json is empty.
Exception: The format of service account json is invalid.
"""
if (
self.configuration["service_account_credentials"] == ""
or self.configuration["service_account_credentials"] is None
):
raise Exception("Google Cloud service account json can't be empty.")
try:
json.loads(self.configuration["service_account_credentials"])
except ValueError:
raise Exception("Google Cloud service account is not a valid JSON.")

@cached_property
def _google_storage_client(self):
"""Initialize and return the GoogleCloudStorageClient
Returns:
GoogleCloudStorageClient: An instance of the GoogleCloudStorageClient.
"""
json_credentials = json.loads(self.configuration["service_account_credentials"])

if (
json_credentials.get("private_key")
and "\n" not in json_credentials["private_key"]
):
json_credentials["private_key"] = get_pem_format(
key=json_credentials["private_key"].strip(),
max_split=2,
)

return GoogleCloudStorageClient(
json_credentials=json_credentials,
retry_count=self.configuration["retry_count"],
)

async def ping(self):
"""Verify the connection with Google Cloud Storage"""
if RUNNING_FTEST:
return

try:
await anext(
self._api_call(
self._google_storage_client.api_call(
resource="projects",
method="serviceAccount",
sub_method="get",
projectId=self.user_project_id,
projectId=self._google_storage_client.user_project_id,
)
)
logger.info("Successfully connected to the Google Cloud Storage.")
Expand All @@ -223,12 +264,12 @@ async def fetch_buckets(self):
Yields:
Dictionary: Contains the list of fetched buckets from Google Cloud Storage.
"""
async for bucket in self._api_call(
async for bucket in self._google_storage_client.api_call(
resource="buckets",
method="list",
full_response=True,
project=self.user_project_id,
userProject=self.user_project_id,
project=self._google_storage_client.user_project_id,
userProject=self._google_storage_client.user_project_id,
):
yield bucket

Expand All @@ -242,12 +283,12 @@ async def fetch_blobs(self, buckets):
Dictionary: Contains the list of fetched blobs from Google Cloud Storage.
"""
for bucket in buckets.get("items", []):
async for blob in self._api_call(
async for blob in self._google_storage_client.api_call(
resource="objects",
method="list",
full_response=True,
bucket=bucket["id"],
userProject=self.user_project_id,
userProject=self._google_storage_client.user_project_id,
):
yield blob

Expand All @@ -266,7 +307,7 @@ def prepare_blob_document(self, blob):
blob_name = urllib.parse.quote(blob_document["name"], safe="'")
blob_document[
"url"
] = f"{CLOUD_STORAGE_BASE_URL}{blob_document['bucket_name']}/{blob_name};tab=live_object?project={self.user_project_id}"
] = f"{CLOUD_STORAGE_BASE_URL}{blob_document['bucket_name']}/{blob_name};tab=live_object?project={self._google_storage_client.user_project_id}"
return blob_document

def get_blob_document(self, blobs):
Expand Down Expand Up @@ -314,13 +355,13 @@ async def get_content(self, blob, timestamp=None, doit=None):
source_file_name = ""
async with NamedTemporaryFile(mode="wb", delete=False) as async_buffer:
await anext(
self._api_call(
self._google_storage_client.api_call(
resource="objects",
method="get",
bucket=blob["bucket_name"],
object=blob_name,
alt="media",
userProject=self.user_project_id,
userProject=self._google_storage_client.user_project_id,
pipe_to=async_buffer,
)
)
Expand Down
Loading

0 comments on commit c704f86

Please sign in to comment.