From 46cdeefdeb6119fbe457ab4b5478d040950c213d Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Wed, 16 Feb 2022 19:07:37 +0530 Subject: [PATCH 01/16] Handled missing providers. --- src/astro/sql/operators/agnostic_save_file.py | 14 +++++++++++--- src/astro/sql/operators/sql_dataframe.py | 11 +++++++++-- src/astro/sql/operators/sql_decorator.py | 16 +++++++++++++--- src/astro/sql/operators/temp_hooks.py | 6 ++++-- src/astro/utils/bigquery_merge_func.py | 3 --- src/astro/utils/load_dataframe.py | 11 +++++++++-- src/astro/utils/postgres_merge_func.py | 5 ++++- src/astro/utils/postgres_transform.py | 5 ++++- src/astro/utils/schema_util.py | 8 ++++++++ 9 files changed, 62 insertions(+), 17 deletions(-) diff --git a/src/astro/sql/operators/agnostic_save_file.py b/src/astro/sql/operators/agnostic_save_file.py index 4a3b358e2..89c1dff95 100644 --- a/src/astro/sql/operators/agnostic_save_file.py +++ b/src/astro/sql/operators/agnostic_save_file.py @@ -22,9 +22,17 @@ import pandas as pd from airflow.hooks.base import BaseHook from airflow.models import BaseOperator, DagRun, TaskInstance -from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook -from airflow.providers.postgres.hooks.postgres import PostgresHook -from google.cloud.storage import Client + +try: + from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook +except ModuleNotFoundError: + from astro.utils.schema_util import RaiseException as BigQueryHook + +try: + from airflow.providers.postgres.hooks.postgres import PostgresHook +except ModuleNotFoundError: + from astro.utils.schema_util import RaiseException as PostgresHook + from smart_open import open from astro.sql.operators.temp_hooks import TempSnowflakeHook diff --git a/src/astro/sql/operators/sql_dataframe.py b/src/astro/sql/operators/sql_dataframe.py index 3038d2c8e..51f77d60c 100644 --- a/src/astro/sql/operators/sql_dataframe.py +++ b/src/astro/sql/operators/sql_dataframe.py @@ -19,8 +19,15 @@ import pandas as pd from airflow.decorators.base import DecoratedOperator from airflow.hooks.base import BaseHook -from airflow.providers.postgres.hooks.postgres import PostgresHook -from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook + +try: + from airflow.providers.postgres.hooks.postgres import PostgresHook +except ModuleNotFoundError: + from astro.utils.schema_util import RaiseException as PostgresHook +try: + from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook +except ModuleNotFoundError: + from astro.utils.schema_util import RaiseException as SnowflakeHook from astro.constants import DEFAULT_CHUNK_SIZE from astro.sql.table import Table, TempTable, create_table_name diff --git a/src/astro/sql/operators/sql_decorator.py b/src/astro/sql/operators/sql_decorator.py index a32c3fc32..2193703f0 100644 --- a/src/astro/sql/operators/sql_decorator.py +++ b/src/astro/sql/operators/sql_decorator.py @@ -21,9 +21,19 @@ from airflow.decorators.base import DecoratedOperator, task_decorator_factory from airflow.hooks.base import BaseHook from airflow.models import DagRun, TaskInstance -from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook -from airflow.providers.postgres.hooks.postgres import PostgresHook -from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook + +try: + from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook +except ModuleNotFoundError: + from astro.utils.schema_util import RaiseException as BigQueryHook +try: + from airflow.providers.postgres.hooks.postgres import PostgresHook +except ModuleNotFoundError: + from astro.utils.schema_util import RaiseException as PostgresHook +try: + from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook +except ModuleNotFoundError: + from astro.utils.schema_util import RaiseException as SnowflakeHook from airflow.utils.db import provide_session from sqlalchemy.sql.functions import Function diff --git a/src/astro/sql/operators/temp_hooks.py b/src/astro/sql/operators/temp_hooks.py index b9b1519f2..e18f9a26a 100644 --- a/src/astro/sql/operators/temp_hooks.py +++ b/src/astro/sql/operators/temp_hooks.py @@ -15,8 +15,10 @@ """ from urllib.parse import quote_plus -from airflow.providers.postgres.hooks.postgres import PostgresHook -from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook +try: + from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook +except ModuleNotFoundError: + from astro.utils.schema_util import RaiseException as SnowflakeHook class TempSnowflakeHook(SnowflakeHook): diff --git a/src/astro/utils/bigquery_merge_func.py b/src/astro/utils/bigquery_merge_func.py index 84bbcbb72..c69b3d9a3 100644 --- a/src/astro/utils/bigquery_merge_func.py +++ b/src/astro/utils/bigquery_merge_func.py @@ -13,9 +13,6 @@ See the License for the specific language governing permissions and limitations under the License. """ -from airflow.providers.postgres.hooks.postgres import PostgresHook -from psycopg2 import sql - from astro.sql.table import Table diff --git a/src/astro/utils/load_dataframe.py b/src/astro/utils/load_dataframe.py index 075d0c6cd..22757f063 100644 --- a/src/astro/utils/load_dataframe.py +++ b/src/astro/utils/load_dataframe.py @@ -15,8 +15,15 @@ """ from typing import Optional, Union -from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook -from airflow.providers.postgres.hooks.postgres import PostgresHook +try: + from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook +except ModuleNotFoundError: + from astro.utils.schema_util import RaiseException as BigQueryHook + +try: + from airflow.providers.postgres.hooks.postgres import PostgresHook +except ModuleNotFoundError: + from astro.utils.schema_util import RaiseException as BigQueryHook from pandas import DataFrame from pandas.io.sql import SQLDatabase from snowflake.connector.pandas_tools import write_pandas diff --git a/src/astro/utils/postgres_merge_func.py b/src/astro/utils/postgres_merge_func.py index 5aa586bb0..8dd65bbd4 100644 --- a/src/astro/utils/postgres_merge_func.py +++ b/src/astro/utils/postgres_merge_func.py @@ -13,7 +13,10 @@ See the License for the specific language governing permissions and limitations under the License. """ -from airflow.providers.postgres.hooks.postgres import PostgresHook +try: + from airflow.providers.postgres.hooks.postgres import PostgresHook +except ModuleNotFoundError: + from astro.utils.schema_util import RaiseException as PostgresHook from psycopg2 import sql from astro.sql.table import Table diff --git a/src/astro/utils/postgres_transform.py b/src/astro/utils/postgres_transform.py index 66b986a24..42be2eaa6 100644 --- a/src/astro/utils/postgres_transform.py +++ b/src/astro/utils/postgres_transform.py @@ -15,7 +15,10 @@ """ import inspect -from airflow.providers.postgres.hooks.postgres import PostgresHook +try: + from airflow.providers.postgres.hooks.postgres import PostgresHook +except ModuleNotFoundError: + from astro.utils.schema_util import RaiseException as PostgresHook from psycopg2.extensions import AsIs from astro.sql.table import Table diff --git a/src/astro/utils/schema_util.py b/src/astro/utils/schema_util.py index be19c2cdf..496ec91cb 100644 --- a/src/astro/utils/schema_util.py +++ b/src/astro/utils/schema_util.py @@ -50,3 +50,11 @@ def get_error_string_for_multiple_dbs(tables: List[Table]): :return: String: error string """ return f'Tables should belong to same db {", ".join([table.table_name for table in tables])}' + + +class RaiseException(object): + def __init__(self, module_name): + self.module_name = module_name + + def __getattr__(self, item): + raise RuntimeError(f"Error loading the module {self.module_name}") From f1279abb7daa99e46ab02ed0b7aaa6fd4a76c802 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Wed, 16 Feb 2022 20:50:20 +0530 Subject: [PATCH 02/16] Added optional dependencies in pyproject.toml --- pyproject.toml | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e58234fc6..768e4b553 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,17 +16,10 @@ license = {file = "LICENSE"} requires-python = ">=3.7" dependencies = [ "apache-airflow >=2.0", - "apache-airflow-providers-postgres", - "apache-airflow-providers-snowflake", - "apache-airflow-providers-google", "python-frontmatter", "pandas >=1.3.4", - "s3fs", - "snowflake-sqlalchemy ==1.2.0", - "snowflake-connector-python[pandas]", - "smart-open[all]>=5.2.1", "SQLAlchemy==1.3.24", - "sqlalchemy-bigquery==1.3.0" + "apache-airflow-providers-postgres", ] keywords = ["airflow", "provider", "astronomer", "sql", "decorator"] @@ -46,6 +39,32 @@ tests = [ "pytest >=6.0", "pytest-dotenv", "requests-mock", + "apache-airflow-providers-google", + "sqlalchemy-bigquery==1.3.0", + "smart-open[all]>=5.2.1", + "apache-airflow-providers-snowflake", + "snowflake-sqlalchemy ==1.2.0", + "snowflake-connector-python[pandas]", + "apache-airflow-providers-postgres", + "s3fs", + "smart-open[s3]>=5.2.1", +] +gcs = [ + "apache-airflow-providers-google", + "sqlalchemy-bigquery==1.3.0", + "smart-open[gcs]>=5.2.1", +] +snowflake = [ + "apache-airflow-providers-snowflake", + "snowflake-sqlalchemy ==1.2.0", + "snowflake-connector-python[pandas]", +] +postgres = [ + "apache-airflow-providers-postgres", +] +aws = [ + "s3fs", + "smart-open[s3]>=5.2.1", ] [project.urls] From bf8c9c01fa1530ea88ec18c2825db21cce79f84d Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Thu, 17 Feb 2022 17:59:00 +0530 Subject: [PATCH 03/16] Added all option in pyproject.toml --- pyproject.toml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 768e4b553..45bce036b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,6 +66,20 @@ aws = [ "s3fs", "smart-open[s3]>=5.2.1", ] +all = [ + "pytest >=6.0", + "pytest-dotenv", + "requests-mock", + "apache-airflow-providers-google", + "sqlalchemy-bigquery==1.3.0", + "smart-open[all]>=5.2.1", + "apache-airflow-providers-snowflake", + "snowflake-sqlalchemy ==1.2.0", + "snowflake-connector-python[pandas]", + "apache-airflow-providers-postgres", + "s3fs", + "smart-open[s3]>=5.2.1", +] [project.urls] Home = "http://astronomer.io/" From 0f7c8bd2afb627cde35b5b055bb39953fc36c929 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Thu, 17 Feb 2022 18:25:06 +0530 Subject: [PATCH 04/16] Moved imports to a seprate files so they look easy on the eyes. --- src/astro/sql/operators/agnostic_save_file.py | 12 +----- src/astro/sql/operators/sql_dataframe.py | 10 +---- src/astro/sql/operators/sql_decorator.py | 14 +------ src/astro/sql/operators/temp_hooks.py | 5 +-- src/astro/utils/dependencies.py | 42 +++++++++++++++++++ src/astro/utils/load_dataframe.py | 10 +---- src/astro/utils/postgres_merge_func.py | 5 +-- src/astro/utils/postgres_transform.py | 5 +-- src/astro/utils/schema_util.py | 8 ---- 9 files changed, 49 insertions(+), 62 deletions(-) create mode 100644 src/astro/utils/dependencies.py diff --git a/src/astro/sql/operators/agnostic_save_file.py b/src/astro/sql/operators/agnostic_save_file.py index 89c1dff95..a2e6575b7 100644 --- a/src/astro/sql/operators/agnostic_save_file.py +++ b/src/astro/sql/operators/agnostic_save_file.py @@ -22,22 +22,12 @@ import pandas as pd from airflow.hooks.base import BaseHook from airflow.models import BaseOperator, DagRun, TaskInstance - -try: - from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook -except ModuleNotFoundError: - from astro.utils.schema_util import RaiseException as BigQueryHook - -try: - from airflow.providers.postgres.hooks.postgres import PostgresHook -except ModuleNotFoundError: - from astro.utils.schema_util import RaiseException as PostgresHook - from smart_open import open from astro.sql.operators.temp_hooks import TempSnowflakeHook from astro.sql.table import Table from astro.utils.cloud_storage_creds import gcs_client, s3fs_creds +from astro.utils.dependencies import BigQueryHook, PostgresHook from astro.utils.schema_util import get_table_name from astro.utils.task_id_helper import get_task_id diff --git a/src/astro/sql/operators/sql_dataframe.py b/src/astro/sql/operators/sql_dataframe.py index 51f77d60c..ac29ead45 100644 --- a/src/astro/sql/operators/sql_dataframe.py +++ b/src/astro/sql/operators/sql_dataframe.py @@ -20,17 +20,9 @@ from airflow.decorators.base import DecoratedOperator from airflow.hooks.base import BaseHook -try: - from airflow.providers.postgres.hooks.postgres import PostgresHook -except ModuleNotFoundError: - from astro.utils.schema_util import RaiseException as PostgresHook -try: - from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook -except ModuleNotFoundError: - from astro.utils.schema_util import RaiseException as SnowflakeHook - from astro.constants import DEFAULT_CHUNK_SIZE from astro.sql.table import Table, TempTable, create_table_name +from astro.utils.dependencies import PostgresHook, SnowflakeHook from astro.utils.load_dataframe import move_dataframe_to_sql from astro.utils.schema_util import get_schema diff --git a/src/astro/sql/operators/sql_decorator.py b/src/astro/sql/operators/sql_decorator.py index 2193703f0..a3acaf75e 100644 --- a/src/astro/sql/operators/sql_decorator.py +++ b/src/astro/sql/operators/sql_decorator.py @@ -21,24 +21,12 @@ from airflow.decorators.base import DecoratedOperator, task_decorator_factory from airflow.hooks.base import BaseHook from airflow.models import DagRun, TaskInstance - -try: - from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook -except ModuleNotFoundError: - from astro.utils.schema_util import RaiseException as BigQueryHook -try: - from airflow.providers.postgres.hooks.postgres import PostgresHook -except ModuleNotFoundError: - from astro.utils.schema_util import RaiseException as PostgresHook -try: - from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook -except ModuleNotFoundError: - from astro.utils.schema_util import RaiseException as SnowflakeHook from airflow.utils.db import provide_session from sqlalchemy.sql.functions import Function from astro.sql.table import Table, create_table_name from astro.utils import postgres_transform, snowflake_transform +from astro.utils.dependencies import BigQueryHook, PostgresHook, SnowflakeHook from astro.utils.load_dataframe import move_dataframe_to_sql from astro.utils.schema_util import get_schema, set_schema_query diff --git a/src/astro/sql/operators/temp_hooks.py b/src/astro/sql/operators/temp_hooks.py index e18f9a26a..e0180bee1 100644 --- a/src/astro/sql/operators/temp_hooks.py +++ b/src/astro/sql/operators/temp_hooks.py @@ -15,10 +15,7 @@ """ from urllib.parse import quote_plus -try: - from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook -except ModuleNotFoundError: - from astro.utils.schema_util import RaiseException as SnowflakeHook +from astro.utils.dependencies import SnowflakeHook class TempSnowflakeHook(SnowflakeHook): diff --git a/src/astro/utils/dependencies.py b/src/astro/utils/dependencies.py new file mode 100644 index 000000000..ff126ca38 --- /dev/null +++ b/src/astro/utils/dependencies.py @@ -0,0 +1,42 @@ +""" +Copyright Astronomer, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + + +class MissingPackage(object): + def __init__(self, module_name): + self.module_name = module_name + + def __getattr__(self, item): + raise RuntimeError( + f"Error loading the module {self.module_name}," + f" please make sure all the dependencies are installed" + ) + + +try: + from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook +except ModuleNotFoundError: + BigQueryHook = MissingPackage("airflow.providers.google.cloud.hooks.bigquery") + +try: + from airflow.providers.postgres.hooks.postgres import PostgresHook +except ModuleNotFoundError: + PostgresHook = MissingPackage("airflow.providers.postgres.hooks.postgres") + +try: + from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook +except ModuleNotFoundError: + SnowflakeHook = MissingPackage("airflow.providers.snowflake.hooks.snowflake") diff --git a/src/astro/utils/load_dataframe.py b/src/astro/utils/load_dataframe.py index 22757f063..a34cb1cc2 100644 --- a/src/astro/utils/load_dataframe.py +++ b/src/astro/utils/load_dataframe.py @@ -15,20 +15,12 @@ """ from typing import Optional, Union -try: - from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook -except ModuleNotFoundError: - from astro.utils.schema_util import RaiseException as BigQueryHook - -try: - from airflow.providers.postgres.hooks.postgres import PostgresHook -except ModuleNotFoundError: - from astro.utils.schema_util import RaiseException as BigQueryHook from pandas import DataFrame from pandas.io.sql import SQLDatabase from snowflake.connector.pandas_tools import write_pandas from astro.sql.operators.temp_hooks import TempSnowflakeHook +from astro.utils.dependencies import BigQueryHook, PostgresHook from astro.utils.schema_util import set_schema_query diff --git a/src/astro/utils/postgres_merge_func.py b/src/astro/utils/postgres_merge_func.py index 8dd65bbd4..989332a8b 100644 --- a/src/astro/utils/postgres_merge_func.py +++ b/src/astro/utils/postgres_merge_func.py @@ -13,13 +13,10 @@ See the License for the specific language governing permissions and limitations under the License. """ -try: - from airflow.providers.postgres.hooks.postgres import PostgresHook -except ModuleNotFoundError: - from astro.utils.schema_util import RaiseException as PostgresHook from psycopg2 import sql from astro.sql.table import Table +from astro.utils.dependencies import PostgresHook def postgres_merge_func( diff --git a/src/astro/utils/postgres_transform.py b/src/astro/utils/postgres_transform.py index 42be2eaa6..b0e00bb4e 100644 --- a/src/astro/utils/postgres_transform.py +++ b/src/astro/utils/postgres_transform.py @@ -15,13 +15,10 @@ """ import inspect -try: - from airflow.providers.postgres.hooks.postgres import PostgresHook -except ModuleNotFoundError: - from astro.utils.schema_util import RaiseException as PostgresHook from psycopg2.extensions import AsIs from astro.sql.table import Table +from astro.utils.dependencies import PostgresHook def parse_template(sql): diff --git a/src/astro/utils/schema_util.py b/src/astro/utils/schema_util.py index 496ec91cb..be19c2cdf 100644 --- a/src/astro/utils/schema_util.py +++ b/src/astro/utils/schema_util.py @@ -50,11 +50,3 @@ def get_error_string_for_multiple_dbs(tables: List[Table]): :return: String: error string """ return f'Tables should belong to same db {", ".join([table.table_name for table in tables])}' - - -class RaiseException(object): - def __init__(self, module_name): - self.module_name = module_name - - def __getattr__(self, item): - raise RuntimeError(f"Error loading the module {self.module_name}") From 0a673d5cbfb7cbb3e8938a68bcfaf3cdbcb7f5a7 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Thu, 17 Feb 2022 22:28:34 +0530 Subject: [PATCH 05/16] Fixed static check. --- src/astro/utils/postgres_transform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/astro/utils/postgres_transform.py b/src/astro/utils/postgres_transform.py index 6589e23e0..ed67e4980 100644 --- a/src/astro/utils/postgres_transform.py +++ b/src/astro/utils/postgres_transform.py @@ -14,8 +14,8 @@ limitations under the License. """ -from psycopg2.extensions import AsIs from airflow.providers.postgres.hooks.postgres import PostgresHook +from psycopg2.extensions import AsIs from astro.sql.table import Table from astro.utils.dependencies import PostgresHook From 474b1b6fd06cedda88d83c56d84563fc799ac407 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Mon, 21 Feb 2022 16:56:52 +0530 Subject: [PATCH 06/16] Updated README with extras option. --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 45bce036b..4920edaea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,7 @@ tests = [ "s3fs", "smart-open[s3]>=5.2.1", ] -gcs = [ +gcp = [ "apache-airflow-providers-google", "sqlalchemy-bigquery==1.3.0", "smart-open[gcs]>=5.2.1", From 074178e0a9874bee55d25a08301161c297d36d20 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Mon, 21 Feb 2022 18:51:46 +0530 Subject: [PATCH 07/16] Updated readme with extras option. --- README.md | 9 ++++++++- tests/operators/test_missing_package.py | 0 2 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 tests/operators/test_missing_package.py diff --git a/README.md b/README.md index d011d5ea0..28a46e413 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,14 @@ To start using `astro`: Alternatively, you can add `astro-projects` to your `requirements.txt` file. -2. Set the following environment variable so that `astro` can pass table objects between tasks: + +2. Installing `astro` with extras(i.e., gcp, snowflake, postgres) + + ```shell script + pip install astro-projects[gcp,snowflake,postgres] + ``` + +3. Set the following environment variable so that `astro` can pass table objects between tasks: ```shell script AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True diff --git a/tests/operators/test_missing_package.py b/tests/operators/test_missing_package.py new file mode 100644 index 000000000..e69de29bb From 00598a23e90c9b22d9a02066156b17199cc61005 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Mon, 21 Feb 2022 20:32:55 +0530 Subject: [PATCH 08/16] Updated error message with pip install command when a package is missing. --- src/astro/constants.py | 1 + src/astro/utils/dependencies.py | 19 ++++++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/astro/constants.py b/src/astro/constants.py index 0796cbb75..e0065973b 100644 --- a/src/astro/constants.py +++ b/src/astro/constants.py @@ -1 +1,2 @@ DEFAULT_CHUNK_SIZE = 1000000 +PYPI_PROJECT_NAME = "astro-projects" diff --git a/src/astro/utils/dependencies.py b/src/astro/utils/dependencies.py index ff126ca38..1e17ebc4b 100644 --- a/src/astro/utils/dependencies.py +++ b/src/astro/utils/dependencies.py @@ -13,30 +13,39 @@ See the License for the specific language governing permissions and limitations under the License. """ +from astro import constants class MissingPackage(object): - def __init__(self, module_name): + def __init__(self, module_name, related_extras): self.module_name = module_name + self.related_extras = related_extras def __getattr__(self, item): raise RuntimeError( f"Error loading the module {self.module_name}," - f" please make sure all the dependencies are installed" + f" please make sure all the dependencies are installed." + f" try - pip install {constants.PYPI_PROJECT_NAME}[{self.related_extras}]" ) try: from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook except ModuleNotFoundError: - BigQueryHook = MissingPackage("airflow.providers.google.cloud.hooks.bigquery") + BigQueryHook = MissingPackage( + "airflow.providers.google.cloud.hooks.bigquery", "gcp" + ) try: from airflow.providers.postgres.hooks.postgres import PostgresHook except ModuleNotFoundError: - PostgresHook = MissingPackage("airflow.providers.postgres.hooks.postgres") + PostgresHook = MissingPackage( + "airflow.providers.postgres.hooks.postgres", "postgres" + ) try: from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook except ModuleNotFoundError: - SnowflakeHook = MissingPackage("airflow.providers.snowflake.hooks.snowflake") + SnowflakeHook = MissingPackage( + "airflow.providers.snowflake.hooks.snowflake", "snowflake" + ) From 00638aea42dc5d7b2fb72061ee659a886765dccd Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Mon, 21 Feb 2022 20:41:33 +0530 Subject: [PATCH 09/16] Updated the extras package names. --- README.md | 2 +- pyproject.toml | 4 ++-- src/astro/utils/dependencies.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 28a46e413..3c734a5e2 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,7 @@ To start using `astro`: 2. Installing `astro` with extras(i.e., gcp, snowflake, postgres) ```shell script - pip install astro-projects[gcp,snowflake,postgres] + pip install astro-projects[google,snowflake,postgres] ``` 3. Set the following environment variable so that `astro` can pass table objects between tasks: diff --git a/pyproject.toml b/pyproject.toml index e0bca97b3..f6c760a29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,7 @@ tests = [ "s3fs", "smart-open[s3]>=5.2.1", ] -gcp = [ +google = [ "apache-airflow-providers-google", "sqlalchemy-bigquery==1.3.0", "smart-open[gcs]>=5.2.1", @@ -63,7 +63,7 @@ snowflake = [ postgres = [ "apache-airflow-providers-postgres", ] -aws = [ +amazon = [ "s3fs", "smart-open[s3]>=5.2.1", ] diff --git a/src/astro/utils/dependencies.py b/src/astro/utils/dependencies.py index 1e17ebc4b..33f541200 100644 --- a/src/astro/utils/dependencies.py +++ b/src/astro/utils/dependencies.py @@ -33,7 +33,7 @@ def __getattr__(self, item): from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook except ModuleNotFoundError: BigQueryHook = MissingPackage( - "airflow.providers.google.cloud.hooks.bigquery", "gcp" + "airflow.providers.google.cloud.hooks.bigquery", "google" ) try: From ef16bf9ddce9ce79fbe32bd20cdeca1ddd473635 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Tue, 22 Feb 2022 15:40:09 +0530 Subject: [PATCH 10/16] Added testcases for the missing package. --- tests/operators/test_missing_package.py | 130 ++++++++++++++++++++++++ 1 file changed, 130 insertions(+) diff --git a/tests/operators/test_missing_package.py b/tests/operators/test_missing_package.py index e69de29bb..100c10a9e 100644 --- a/tests/operators/test_missing_package.py +++ b/tests/operators/test_missing_package.py @@ -0,0 +1,130 @@ +""" +Copyright Astronomer, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import pytest + +""" +Unittest module to test Operators. + +Requires the unittest, pytest, and requests-mock Python libraries. + +""" + +import logging +import math +import os +import pathlib +import unittest.mock + +from airflow.models import DAG +from airflow.utils import timezone +from airflow.utils.state import State +from airflow.utils.types import DagRunType + +log = logging.getLogger(__name__) +DEFAULT_DATE = timezone.datetime(2016, 1, 1) + +original_import = __import__ + + +def import_mock(name, *args): + if name in [ + "airflow.providers.google.cloud.hooks.bigquery", + "airflow.providers.postgres.hooks.postgres", + "airflow.providers.snowflake.hooks.snowflake", + ]: + raise ModuleNotFoundError + return original_import(name, *args) + + +class TestAggregateCheckOperator(unittest.TestCase): + """ + Test Postgres Merge Operator. + """ + + cwd = pathlib.Path(__file__).parent + + @classmethod + def setUpClass(cls): + super().setUpClass() + + def clear_run(self): + self.run = False + + def setUp(self): + super().setUp() + self.dag = DAG( + "test_dag", + default_args={ + "owner": "airflow", + "start_date": DEFAULT_DATE, + }, + ) + + def create_and_run_task(self, decorator_func, op_args, op_kwargs): + with self.dag: + f = decorator_func(*op_args, **op_kwargs) + + dr = self.dag.create_dagrun( + run_id=DagRunType.MANUAL.value, + start_date=timezone.utcnow(), + execution_date=DEFAULT_DATE, + data_interval=[DEFAULT_DATE, DEFAULT_DATE], + state=State.RUNNING, + ) + f.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + return f + + def test_missing_bigquery_package(self): + with unittest.mock.patch("builtins.__import__", import_mock): + from astro.utils.dependencies import BigQueryHook + + with pytest.raises(RuntimeError) as error: + BigQueryHook.conn_type + + assert ( + str(error.value) + == "Error loading the module airflow.providers.google.cloud.hooks.bigquery," + " please make sure all the dependencies are installed. try - pip install" + " astro-projects[google]" + ) + + def test_missing_postgres_package(self): + with unittest.mock.patch("builtins.__import__", import_mock): + from astro.utils.dependencies import PostgresHook + + with pytest.raises(RuntimeError) as error: + PostgresHook.conn_type + + assert ( + str(error.value) + == "Error loading the module airflow.providers.postgres.hooks.postgres," + " please make sure all the dependencies are installed. try - pip install" + " astro-projects[postgres]" + ) + + def test_missing_snowflake_package(self): + with unittest.mock.patch("builtins.__import__", import_mock): + from astro.utils.dependencies import SnowflakeHook + + with pytest.raises(RuntimeError) as error: + SnowflakeHook.conn_type + + assert ( + str(error.value) + == "Error loading the module airflow.providers.snowflake.hooks.snowflake," + " please make sure all the dependencies are installed. try - pip install" + " astro-projects[snowflake]" + ) From e9a67a3512c99ccb476a9363cf98252319aa4255 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Tue, 22 Feb 2022 15:57:46 +0530 Subject: [PATCH 11/16] Added a docstring for TestMissingPackages class. --- src/astro/__init__.py | 1 - tests/operators/test_missing_package.py | 11 +++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/astro/__init__.py b/src/astro/__init__.py index 069b993e3..a8b216122 100644 --- a/src/astro/__init__.py +++ b/src/astro/__init__.py @@ -17,7 +17,6 @@ """A decorator that allows users to run SQL queries natively in Airflow.""" __version__ = "0.5.1" -from astro.dataframe import dataframe # This is needed to allow Airflow to pick up specific metadata fields it needs diff --git a/tests/operators/test_missing_package.py b/tests/operators/test_missing_package.py index 100c10a9e..1ae204dd1 100644 --- a/tests/operators/test_missing_package.py +++ b/tests/operators/test_missing_package.py @@ -49,9 +49,16 @@ def import_mock(name, *args): return original_import(name, *args) -class TestAggregateCheckOperator(unittest.TestCase): +class TestMissingPackages(unittest.TestCase): """ - Test Postgres Merge Operator. + Test Missing packages. + + NOTE - These testcases will fail in case if we import any dependencies in 'astro/__init__.py' + which directly import or import a package which import below-mentioned packages. + + airflow.providers.google.cloud.hooks.bigquery + airflow.providers.postgres.hooks.postgres + airflow.providers.snowflake.hooks.snowflake """ cwd = pathlib.Path(__file__).parent From 80108865582fc3bcc88cebaae35f026315c6f72f Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Tue, 22 Feb 2022 15:59:39 +0530 Subject: [PATCH 12/16] Removed unwamted code. --- tests/operators/test_missing_package.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/tests/operators/test_missing_package.py b/tests/operators/test_missing_package.py index 1ae204dd1..b0cccd347 100644 --- a/tests/operators/test_missing_package.py +++ b/tests/operators/test_missing_package.py @@ -67,32 +67,8 @@ class TestMissingPackages(unittest.TestCase): def setUpClass(cls): super().setUpClass() - def clear_run(self): - self.run = False - def setUp(self): super().setUp() - self.dag = DAG( - "test_dag", - default_args={ - "owner": "airflow", - "start_date": DEFAULT_DATE, - }, - ) - - def create_and_run_task(self, decorator_func, op_args, op_kwargs): - with self.dag: - f = decorator_func(*op_args, **op_kwargs) - - dr = self.dag.create_dagrun( - run_id=DagRunType.MANUAL.value, - start_date=timezone.utcnow(), - execution_date=DEFAULT_DATE, - data_interval=[DEFAULT_DATE, DEFAULT_DATE], - state=State.RUNNING, - ) - f.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - return f def test_missing_bigquery_package(self): with unittest.mock.patch("builtins.__import__", import_mock): From bfc44b0b141fc134fe0b08be81b5f3116e4e341a Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Tue, 22 Feb 2022 17:04:51 +0530 Subject: [PATCH 13/16] Removed reference to provider packages directly. --- src/astro/utils/postgres_transform.py | 1 - src/astro/utils/schema_util.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/astro/utils/postgres_transform.py b/src/astro/utils/postgres_transform.py index ed67e4980..2dfc0f7fe 100644 --- a/src/astro/utils/postgres_transform.py +++ b/src/astro/utils/postgres_transform.py @@ -14,7 +14,6 @@ limitations under the License. """ -from airflow.providers.postgres.hooks.postgres import PostgresHook from psycopg2.extensions import AsIs from astro.sql.table import Table diff --git a/src/astro/utils/schema_util.py b/src/astro/utils/schema_util.py index be19c2cdf..18616bfe0 100644 --- a/src/astro/utils/schema_util.py +++ b/src/astro/utils/schema_util.py @@ -2,10 +2,10 @@ from typing import List from airflow.hooks.base import BaseHook -from airflow.providers.postgres.hooks.postgres import PostgresHook from psycopg2 import sql from astro.sql.table import Table +from astro.utils.dependencies import PostgresHook def set_schema_query(conn_type, hook, schema_id, user): From 63573c775d672d84617e9d26122560fbc5a57097 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Tue, 22 Feb 2022 17:09:06 +0530 Subject: [PATCH 14/16] Moved unit testcase to miscellaneous dir. --- tests/{operators => miscellaneous}/test_missing_package.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{operators => miscellaneous}/test_missing_package.py (100%) diff --git a/tests/operators/test_missing_package.py b/tests/miscellaneous/test_missing_package.py similarity index 100% rename from tests/operators/test_missing_package.py rename to tests/miscellaneous/test_missing_package.py From d6440e0c9dd63ecad63d78f9aaa060ea9e472b62 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Tue, 22 Feb 2022 20:04:12 +0530 Subject: [PATCH 15/16] 1. Fixed testcases 2. Removed TempSnowflake hook. --- src/astro/__init__.py | 2 ++ src/astro/sql/operators/agnostic_save_file.py | 5 ++--- src/astro/utils/load_dataframe.py | 7 +++---- src/astro/utils/postgres_transform.py | 2 -- tests/miscellaneous/test_missing_package.py | 4 ++++ 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/astro/__init__.py b/src/astro/__init__.py index a8b216122..851bf60c7 100644 --- a/src/astro/__init__.py +++ b/src/astro/__init__.py @@ -18,6 +18,8 @@ __version__ = "0.5.1" +from astro.dataframe import dataframe + # This is needed to allow Airflow to pick up specific metadata fields it needs # for certain features. We recognize it's a bit unclean to define these in diff --git a/src/astro/sql/operators/agnostic_save_file.py b/src/astro/sql/operators/agnostic_save_file.py index a721408be..480bc794b 100644 --- a/src/astro/sql/operators/agnostic_save_file.py +++ b/src/astro/sql/operators/agnostic_save_file.py @@ -24,10 +24,9 @@ from airflow.models import BaseOperator, DagRun, TaskInstance from smart_open import open -from astro.sql.operators.temp_hooks import TempSnowflakeHook from astro.sql.table import Table from astro.utils.cloud_storage_creds import gcs_client, s3fs_creds -from astro.utils.dependencies import BigQueryHook, PostgresHook +from astro.utils.dependencies import BigQueryHook, PostgresHook, SnowflakeHook from astro.utils.schema_util import get_table_name from astro.utils.task_id_helper import get_task_id @@ -86,7 +85,7 @@ def execute(self, context): "postgres": PostgresHook( postgres_conn_id=input_table.conn_id, schema=input_table.database ), - "snowflake": TempSnowflakeHook( + "snowflake": SnowflakeHook( snowflake_conn_id=input_table.conn_id, database=input_table.database, schema=input_table.schema, diff --git a/src/astro/utils/load_dataframe.py b/src/astro/utils/load_dataframe.py index a559ee99c..1a9600a3e 100644 --- a/src/astro/utils/load_dataframe.py +++ b/src/astro/utils/load_dataframe.py @@ -19,8 +19,7 @@ from pandas.io.sql import SQLDatabase from snowflake.connector import pandas_tools -from astro.sql.operators.temp_hooks import TempSnowflakeHook -from astro.utils.dependencies import BigQueryHook, PostgresHook +from astro.utils.dependencies import BigQueryHook, PostgresHook, SnowflakeHook from astro.utils.schema_util import set_schema_query @@ -36,10 +35,10 @@ def move_dataframe_to_sql( chunksize, ): # Select database Hook based on `conn` type - hook: Union[PostgresHook, TempSnowflakeHook] = { # type: ignore + hook: Union[PostgresHook, SnowflakeHook] = { # type: ignore "postgresql": PostgresHook(postgres_conn_id=conn_id, schema=database), "postgres": PostgresHook(postgres_conn_id=conn_id, schema=database), - "snowflake": TempSnowflakeHook( + "snowflake": SnowflakeHook( snowflake_conn_id=conn_id, database=database, schema=schema, diff --git a/src/astro/utils/postgres_transform.py b/src/astro/utils/postgres_transform.py index 2dfc0f7fe..8eef129d2 100644 --- a/src/astro/utils/postgres_transform.py +++ b/src/astro/utils/postgres_transform.py @@ -14,8 +14,6 @@ limitations under the License. """ -from psycopg2.extensions import AsIs - from astro.sql.table import Table from astro.utils.dependencies import PostgresHook diff --git a/tests/miscellaneous/test_missing_package.py b/tests/miscellaneous/test_missing_package.py index b0cccd347..60af55818 100644 --- a/tests/miscellaneous/test_missing_package.py +++ b/tests/miscellaneous/test_missing_package.py @@ -26,6 +26,7 @@ import math import os import pathlib +import sys import unittest.mock from airflow.models import DAG @@ -65,6 +66,9 @@ class TestMissingPackages(unittest.TestCase): @classmethod def setUpClass(cls): + # Removed cached module from sys.module to make any + # code post mocking 'astro.utils.dependencies' work. + sys.modules.pop("astro.utils.dependencies", None) super().setUpClass() def setUp(self): From 3c0fbfb4b4ab74b381ec9e779b4ca7c6fd777e0d Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Tue, 22 Feb 2022 20:23:53 +0530 Subject: [PATCH 16/16] Deleted temp_hook.py --- src/astro/sql/operators/temp_hooks.py | 33 --------------------------- 1 file changed, 33 deletions(-) delete mode 100644 src/astro/sql/operators/temp_hooks.py diff --git a/src/astro/sql/operators/temp_hooks.py b/src/astro/sql/operators/temp_hooks.py deleted file mode 100644 index e0180bee1..000000000 --- a/src/astro/sql/operators/temp_hooks.py +++ /dev/null @@ -1,33 +0,0 @@ -""" -Copyright Astronomer, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from urllib.parse import quote_plus - -from astro.utils.dependencies import SnowflakeHook - - -class TempSnowflakeHook(SnowflakeHook): - """ - Temporary class to get around a bug in the snowflakehook when creating URIs - """ - - def get_uri(self) -> str: - """Override DbApiHook get_uri method for get_sqlalchemy_engine()""" - conn_config = self._get_conn_params() - uri = ( - "snowflake://{user}:{password}@{account}/{database}/{schema}" - "?warehouse={warehouse}&role={role}&authenticator={authenticator}" - ) - return uri.format(**conn_config)