From db32826d9353cdeb2c59adf02fcfc9cffb0d62d8 Mon Sep 17 00:00:00 2001 From: Florian Valeye Date: Thu, 30 Dec 2021 12:43:47 +0100 Subject: [PATCH] Fix the DeltaTable external provider source and refactoring of the progression bar in the scanner for Python binding (#9) --- python/metadata_guardian/report.py | 32 ++++---- python/metadata_guardian/scanner.py | 78 +++++++++---------- .../source/external/deltatable_source.py | 4 +- .../external/external_metadata_source.py | 5 +- python/tests/test_scanner.py | 3 - 5 files changed, 59 insertions(+), 63 deletions(-) diff --git a/python/metadata_guardian/report.py b/python/metadata_guardian/report.py index 7c3cc73..d366378 100644 --- a/python/metadata_guardian/report.py +++ b/python/metadata_guardian/report.py @@ -32,9 +32,22 @@ def __init__(self) -> None: TimeRemainingColumn(), ) - def add_task( # type: ignore - self, item_name: str, source_type: str, total: int, current_item: str = "" - ) -> int: + def __enter__(self) -> "ProgressionBar": + super().__enter__() + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore + if self.task_id is not None: + super().update(self.task_id, current_item="Done") + super().__exit__(exc_type, exc_val, exc_tb) + + def add_task_with_item( + self, + item_name: Optional[str], + source_type: str, + total: int, + current_item: str = "", + ) -> None: """ Add task in the Progression Bar. :param item_name: the name of the item to search @@ -49,25 +62,16 @@ def add_task( # type: ignore current_item=current_item, ) self.task_id = task_id - return task_id - def update(self, current_item: str) -> None: # type: ignore + def update_item(self, current_item: str) -> None: """ - Update the task of the Progression Bar. + Update the current item of the task. :param current_item: the name of the current item :return: """ if self.task_id is not None: super().update(self.task_id, advance=1, current_item=current_item) - def terminate(self) -> None: - """ - Terminate the current task - :return: - """ - if self.task_id is not None: - super().update(self.task_id, current_item="Done") - @dataclass class ReportResults: diff --git a/python/metadata_guardian/scanner.py b/python/metadata_guardian/scanner.py index e39b098..a9de3a6 100644 --- a/python/metadata_guardian/scanner.py +++ b/python/metadata_guardian/scanner.py @@ -68,7 +68,6 @@ class ColumnScanner(Scanner): """Column Scanner instance.""" data_rules: DataRules - progress: ProgressionBar = ProgressionBar() def scan_local(self, source: LocalMetadataSource) -> MetadataGuardianReport: """ @@ -79,8 +78,7 @@ def scan_local(self, source: LocalMetadataSource) -> MetadataGuardianReport: logger.debug( f"[blue]Launch the metadata scanning of the local provider {source.type}" ) - with self.progress: - + with ProgressionBar() as progression_bar: report = MetadataGuardianReport( report_results=[ ReportResults( @@ -91,8 +89,7 @@ def scan_local(self, source: LocalMetadataSource) -> MetadataGuardianReport: ) ] ) - self.progress.update(current_item=source.local_path) - self.progress.terminate() + progression_bar.update_item(current_item=source.local_path) return report def scan_external( @@ -113,9 +110,9 @@ def scan_external( logger.debug( f"[blue]Launch the metadata scanning of the external provider {source.type} for the database {database_name}" ) - with self.progress: + with ProgressionBar() as progression_bar: if table_name: - self.progress.add_task( + progression_bar.add_task_with_item( item_name=database_name, source_type=source.type, total=1, @@ -135,13 +132,13 @@ def scan_external( ) ] ) - self.progress.update(current_item=table_name) + progression_bar.update_item(current_item=table_name) else: report = MetadataGuardianReport() table_names_list = source.get_table_names_list( database_name=database_name ) - self.progress.add_task( + progression_bar.add_task_with_item( item_name=database_name, source_type=source.type, total=len(table_names_list), @@ -164,8 +161,7 @@ def scan_external( ] ) ) - self.progress.update(current_item=table_name) - self.progress.terminate() + progression_bar.update_item(current_item=table_name) return report async def scan_external_async( @@ -193,7 +189,7 @@ async def scan_external_async( ) async def async_validate_words( - progress: ProgressionBar, table_name: str + progression_bar: ProgressionBar, table_name: str ) -> ReportResults: async with semaphore: loop = asyncio.get_event_loop() @@ -204,17 +200,18 @@ async def async_validate_words( table_name, include_comment, ) - progress.update(current_item=table_name) + progression_bar.update_item(current_item=table_name) return ReportResults( source=f"{database_name}.{table_name}", results=self.data_rules.validate_words(words=words), ) - with self.progress: - total = 1 + with ProgressionBar() as progression_bar: if table_name: tasks = [ - async_validate_words(progress=self.progress, table_name=table_name) + async_validate_words( + progression_bar=progression_bar, table_name=table_name + ) ] else: table_names_list = source.get_table_names_list( @@ -222,18 +219,18 @@ async def async_validate_words( ) tasks = [ - async_validate_words(progress=self.progress, table_name=table_name) + async_validate_words( + progression_bar=progression_bar, table_name=table_name + ) for table_name in table_names_list ] - total = len(table_names_list) - self.progress.add_task( + progression_bar.add_task_with_item( item_name=database_name, source_type=source.type, - total=total, + total=len(tasks), ) report_results = await asyncio.gather(*tasks) report = MetadataGuardianReport(report_results=report_results) - self.progress.terminate() return report @@ -242,7 +239,6 @@ class ContentFilesScanner: """Content Files Scanner instance.""" data_rules: DataRules - progress: ProgressionBar = ProgressionBar() def scan_local_file(self, path: str) -> MetadataGuardianReport: """ @@ -250,8 +246,14 @@ def scan_local_file(self, path: str) -> MetadataGuardianReport: :param path: the path of the file to scan :return: a Metadata Guardian report """ - with self.progress: - self.progress.add_task(item_name=path, source_type="files", total=1) + logger.debug( + f"[blue]Launch the metadata scanning the content of the file {path}" + ) + progression_bar: ProgressionBar + with ProgressionBar() as progression_bar: + progression_bar.add_task_with_item( + item_name=path, source_type="files", total=1 + ) report = MetadataGuardianReport( report_results=[ ReportResults( @@ -259,10 +261,9 @@ def scan_local_file(self, path: str) -> MetadataGuardianReport: ) ] ) - self.progress.update(current_item=path) - self.progress.terminate() + progression_bar.update_item(current_item=path) - return report + return report def scan_directory( self, directory_path: str, file_names_extension: str @@ -270,25 +271,16 @@ def scan_directory( """ Scan all the files inside directory path with the file name extension. :param directory_path: the directory path to scan - :param file_names_extension: the file name extension to include (without the .) + :param file_names_extension: the file name extension to include (without the ".") :return: a Metadata Guardian report """ logger.debug( f"[blue]Launch the metadata scanning the content of the files {directory_path} with extension{file_names_extension}" ) report = MetadataGuardianReport() - with self.progress: - for root, dirs, files in os.walk(directory_path): - self.progress.add_task( - item_name=root, - source_type="files", - total=len(files), - current_item="None", - ) - for name in files: - path = f"{root}/{name}" - if name.endswith(f".{file_names_extension}"): - report.append(other_report=self.scan_local_file(path=path)) - self.progress.update(current_item=name) - self.progress.terminate() - return report + for root, dirs, files in os.walk(directory_path): + for name in files: + path = f"{root}/{name}" + if name.endswith(f".{file_names_extension}"): + report.append(other_report=self.scan_local_file(path=path)) + return report diff --git a/python/metadata_guardian/source/external/deltatable_source.py b/python/metadata_guardian/source/external/deltatable_source.py index 1485c07..bb4240a 100644 --- a/python/metadata_guardian/source/external/deltatable_source.py +++ b/python/metadata_guardian/source/external/deltatable_source.py @@ -67,11 +67,11 @@ def get_column_names( def get_table_names_list(self, database_name: str) -> List[str]: """ - Get the table names list from the database. + Not relevant, just return the current Delta Table URI :param database_name: the database name :return: the list of the table names of the database """ - raise NotImplemented() + return [self.uri] @property def type(self) -> str: diff --git a/python/metadata_guardian/source/external/external_metadata_source.py b/python/metadata_guardian/source/external/external_metadata_source.py index 6af5f91..ac22245 100644 --- a/python/metadata_guardian/source/external/external_metadata_source.py +++ b/python/metadata_guardian/source/external/external_metadata_source.py @@ -12,7 +12,10 @@ class ExternalMetadataSource(MetadataSource): @abstractmethod def get_column_names( - self, database_name: str, table_name: str, include_comment: bool = False + self, + database_name: str, + table_name: str, + include_comment: bool = False, ) -> List[str]: """ Get the column names from the schema. diff --git a/python/tests/test_scanner.py b/python/tests/test_scanner.py index df903b1..fb42322 100644 --- a/python/tests/test_scanner.py +++ b/python/tests/test_scanner.py @@ -44,7 +44,6 @@ def test_column_scanner_table_name(mock_connection): ) assert report == expected - assert column_scanner.progress.task_id is not None @patch("snowflake.connector") @@ -84,7 +83,6 @@ def test_column_scanner_database_name(mock_connection): report = column_scanner.scan_external(database_name=database_name, source=source) assert report == expected - assert column_scanner.progress.task_id is not None @patch("snowflake.connector") @@ -126,7 +124,6 @@ def test_column_scanner_database_name_async(mock_connection): ) assert report == expected - assert column_scanner.progress.task_id is not None def test_local_directory_scan():