Skip to content

Commit

Permalink
Improve the connection performance of the external providers in Python
Browse files Browse the repository at this point in the history
  • Loading branch information
fvaleye committed Dec 29, 2021
1 parent 83eb9d1 commit 81827d6
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 67 deletions.
15 changes: 7 additions & 8 deletions python/metadata_guardian/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,21 @@ def to_console(self) -> None:
title=":magnifying_glass_tilted_right: Metadata Guardian report",
show_header=True,
header_style="bold dim",
show_lines=True,
)
_table.add_column("Category", style="yellow", width=30)
_table.add_column("Source", style="cyan", width=30)
_table.add_column("Content", style="cyan", width=30)
_table.add_column("Name", style="magenta", width=30)
_table.add_column("Pattern")
_table.add_column("Documentation", width=50)
_table.add_column("Category", style="yellow", no_wrap=True)
_table.add_column("Source", style="cyan", no_wrap=True)
_table.add_column("Content", style="cyan", no_wrap=True)
_table.add_column("Name", style="magenta", no_wrap=True)
_table.add_column("Documentation")
for report in self.report_results:
for result in report.results:
for data_rule in result.data_rules:
_table.add_row(
result.category,
report.source,
result.content,
result.content.strip(),
data_rule.rule_name,
data_rule.regex_pattern,
data_rule.documentation,
)
if _table.rows:
Expand Down
50 changes: 29 additions & 21 deletions python/metadata_guardian/source/external/aws_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

AWS_INSTALLED = True
except ImportError:
logger.warning("AWS optional dependency is not installed.")
logger.debug("AWS optional dependency is not installed.")
AWS_INSTALLED = False


Expand All @@ -32,12 +32,12 @@ class AthenaSource(ExternalMetadataSource):
aws_access_key_id: Optional[str] = None
aws_secret_access_key: Optional[str] = None

def get_connection(self) -> AthenaClient:
def get_connection(self) -> None:
"""
Get Athena connection.
:return: a new Athena connection.
:return:
"""
return boto3.client(
self.connection = boto3.client(
"athena",
region_name=self.region_name,
aws_access_key_id=self.aws_access_key_id,
Expand All @@ -55,8 +55,9 @@ def get_column_names(
:return: the list of the column names
"""
try:
client = self.get_connection()
response = client.get_table_metadata(
if not self.connection:
self.get_connection()
response = self.connection.get_table_metadata(
CatalogName=self.catalog_name,
DatabaseName=database_name,
TableName=table_name,
Expand All @@ -65,7 +66,8 @@ def get_column_names(
for row in response["TableMetadata"]["Columns"]:
columns.append(row["Name"].lower())
if include_comment:
columns.append(row["Comment"].lower())
if "Comment" in row:
columns.append(row["Comment"].lower())
return columns
except botocore.exceptions.ClientError as error:
logger.exception(
Expand All @@ -80,16 +82,17 @@ def get_table_names_list(self, database_name: str) -> List[str]:
:return: the list of the table names of the database
"""
try:
client = self.get_connection()
if not self.connection:
self.get_connection()
table_names_list = list()
response = client.list_table_metadata(
response = self.connection.list_table_metadata(
CatalogName=self.catalog_name,
DatabaseName=database_name,
)
for table in response["TableMetadataList"]:
table_names_list.append(table["Name"])
while "NextToken" in response:
response = client.list_table_metadata(
response = self.connection.list_table_metadata(
CatalogName=self.catalog_name,
DatabaseName=database_name,
NextToken=response["NextToken"],
Expand All @@ -109,7 +112,7 @@ def type(self) -> str:
The type of the source.
:return: the name o of the source.
"""
return "AWSAthena"
return "AWS Athena"

@dataclass
class GlueSource(ExternalMetadataSource):
Expand All @@ -119,12 +122,12 @@ class GlueSource(ExternalMetadataSource):
aws_access_key_id: Optional[str] = None
aws_secret_access_key: Optional[str] = None

def get_connection(self) -> GlueClient:
def get_connection(self) -> None:
"""
Get the Glue connection
:return: one Glue client
:return:
"""
return boto3.client(
self.connection = boto3.client(
"glue",
region_name=self.region_name,
aws_access_key_id=self.aws_access_key_id,
Expand All @@ -142,13 +145,17 @@ def get_column_names(
:return: the list of the column names
"""
try:
client = self.get_connection()
response = client.get_table(DatabaseName=database_name, Name=table_name)
if not self.connection:
self.get_connection()
response = self.connection.get_table(
DatabaseName=database_name, Name=table_name
)
columns = list()
for row in response["Table"]["StorageDescriptor"]["Columns"]:
columns.append(row["Name"].lower())
if include_comment:
columns.append(row["Comment"].lower())
if "Comment" in row:
columns.append(row["Comment"].lower())
return columns
except botocore.exceptions.ClientError as exception:
logger.exception(
Expand All @@ -163,15 +170,16 @@ def get_table_names_list(self, database_name: str) -> List[str]:
:return: the list of the table names of the database
"""
try:
client = self.get_connection()
if not self.connection:
self.get_connection()
table_names_list = list()
response = client.get_tables(
response = self.connection.get_tables(
DatabaseName=database_name,
)
for table in response["TableList"]:
table_names_list.append(table["Name"])
while "NextToken" in response:
response = client.list_table_metadata(
response = self.connection.get_tables(
DatabaseName=database_name, NextToken=response["NextToken"]
)
for table in response["TableList"]:
Expand All @@ -189,4 +197,4 @@ def type(self) -> str:
The type of the source.
:return: the name of the source.
"""
return "AWSGlue"
return "AWS Glue"
16 changes: 8 additions & 8 deletions python/metadata_guardian/source/external/deltatable_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

DELTA_LAKE_INSTALLED = True
except ImportError:
logger.warning("Delta Lake optional dependency is not installed.")
logger.debug("Delta Lake optional dependency is not installed.")
DELTA_LAKE_INSTALLED = False

if DELTA_LAKE_INSTALLED:
Expand All @@ -23,12 +23,12 @@ class DeltaTableSource(ExternalMetadataSource):
uri: str
data_catalog: DataCatalog = DataCatalog.AWS

def get_connection(self) -> Any:
def get_connection(self) -> None:
"""
Get the DeltaTable instance.
:return: the DeltaTable instance
:return:
"""
return DeltaTable(self.uri)
self.connection = DeltaTable(self.uri)

def get_column_names(
self,
Expand All @@ -45,14 +45,14 @@ def get_column_names(
"""
try:
if database_name and table_name:
delta_table = DeltaTable.from_data_catalog(
self.connection = DeltaTable.from_data_catalog(
data_catalog=self.data_catalog,
database_name=database_name,
table_name=table_name,
)
else:
delta_table = self.get_connection()
schema = delta_table.schema()
self.get_connection()
schema = self.connection.schema()
columns = list()
for field in schema.fields:
columns.append(field.name.lower())
Expand All @@ -79,4 +79,4 @@ def type(self) -> str:
The type of the source.
:return: the name o of the source.
"""
return "DeltaTable"
return "Delta Table"
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import abstractmethod
from typing import Any, List
from typing import Any, List, Optional

from ...exceptions import MetadataGuardianException
from ..metadata_source import MetadataSource
Expand All @@ -8,6 +8,8 @@
class ExternalMetadataSource(MetadataSource):
"""ExternalMetadataSource Source."""

connection: Optional[Any] = None

@abstractmethod
def get_column_names(
self, database_name: str, table_name: str, include_comment: bool = False
Expand All @@ -31,10 +33,10 @@ def get_table_names_list(self, database_name: str) -> List[str]:
pass

@abstractmethod
def get_connection(self) -> Any:
def get_connection(self) -> None:
"""
Get the connection of the source.
:return: the source connection
:return:
"""
pass

Expand Down
24 changes: 13 additions & 11 deletions python/metadata_guardian/source/external/gcp_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

GCP_INSTALLED = True
except ImportError:
logger.warning("GCP optional dependency is not installed.")
logger.debug("GCP optional dependency is not installed.")
GCP_INSTALLED = False

if GCP_INSTALLED:
Expand All @@ -26,13 +26,13 @@ class BigQuerySource(ExternalMetadataSource):
project: Optional[str] = None
location: Optional[str] = None

def get_connection(self) -> Any:
def get_connection(self) -> None:
"""
Get the Big Query connection.
:return: a BigQuery Client
:return:
"""
try:
return bigquery.Client.from_service_account_json(
self.connection = bigquery.Client.from_service_account_json(
self.service_account_json_path,
project=self.project,
location=self.location,
Expand All @@ -45,23 +45,24 @@ def get_column_names(
self, database_name: str, table_name: str, include_comment: bool = False
) -> List[str]:
"""
Get column names from the table.
Get column names from the table of the dataset.
:param database_name: in that case the dataset
:param table_name: the table name
:param include_comment: include the comment
:return: the list of the column names
"""

try:
client = self.get_connection()
query_job = client.query(
if not self.connection:
self.get_connection()
query_job = self.connection.query(
f'SELECT column_name, description FROM `{database_name}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS` WHERE table_name = "{table_name}"'
)
results = query_job.result()
columns = list()
for row in results:
columns.append(row.column_name.lower())
if include_comment:
if include_comment and row.description:
columns.append(row.description.lower())
return columns
except Exception as exception:
Expand All @@ -78,8 +79,9 @@ def get_table_names_list(self, database_name: str) -> List[str]:
"""

try:
client = self.get_connection()
query_job = client.query(
if not self.connection:
self.get_connection()
query_job = self.connection.query(
f"SELECT table_name FROM `{database_name}.INFORMATION_SCHEMA.TABLES`"
)
results = query_job.result()
Expand All @@ -99,4 +101,4 @@ def type(self) -> str:
The type of the source.
:return: the name bof the source.
"""
return "BigQuery"
return "GCP BigQuery"
Loading

0 comments on commit 81827d6

Please sign in to comment.