Skip to content

Commit

Permalink
Manage dependencies (#113)
Browse files Browse the repository at this point in the history
Handled missing providers.

Co-authored-by: utkarsh sharma <utkarsharma@utkarshs-MacBook-Pro.local>
  • Loading branch information
utkarsharma2 and utkarsh sharma committed Mar 30, 2022
1 parent 9b72f15 commit f9eebef
Show file tree
Hide file tree
Showing 15 changed files with 229 additions and 66 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,14 @@ To start using `astro`:

Alternatively, you can add `astro-projects` to your `requirements.txt` file.

2. Set the following environment variable so that `astro` can pass table objects between tasks:

2. Installing `astro` with extras(i.e., gcp, snowflake, postgres)

```shell script
pip install astro-projects[google,snowflake,postgres]
```

3. Set the following environment variable so that `astro` can pass table objects between tasks:

```shell script
AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True
Expand Down
50 changes: 41 additions & 9 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,10 @@ license = {file = "LICENSE"}
requires-python = ">=3.7"
dependencies = [
"apache-airflow >=2.0",
"markupsafe>=1.1.1,<2.1.0",
"apache-airflow-providers-postgres",
"apache-airflow-providers-snowflake",
"apache-airflow-providers-google",
"python-frontmatter",
"pandas >=1.3.4",
"s3fs",
"snowflake-sqlalchemy ==1.2.0",
"snowflake-connector-python[pandas]",
"smart-open[all]>=5.2.1",
"SQLAlchemy==1.3.24",
"sqlalchemy-bigquery==1.3.0",
"apache-airflow-providers-postgres",
"markupsafe>=1.1.1,<2.1.0"
]

Expand All @@ -49,6 +41,46 @@ tests = [
"pytest-split",
"pytest-dotenv",
"requests-mock",
"apache-airflow-providers-google",
"sqlalchemy-bigquery==1.3.0",
"smart-open[all]>=5.2.1",
"apache-airflow-providers-snowflake",
"snowflake-sqlalchemy ==1.2.0",
"snowflake-connector-python[pandas]",
"apache-airflow-providers-postgres",
"s3fs",
"smart-open[s3]>=5.2.1",
]
google = [
"apache-airflow-providers-google",
"sqlalchemy-bigquery==1.3.0",
"smart-open[gcs]>=5.2.1",
]
snowflake = [
"apache-airflow-providers-snowflake",
"snowflake-sqlalchemy ==1.2.0",
"snowflake-connector-python[pandas]",
]
postgres = [
"apache-airflow-providers-postgres",
]
amazon = [
"s3fs",
"smart-open[s3]>=5.2.1",
]
all = [
"pytest >=6.0",
"pytest-dotenv",
"requests-mock",
"apache-airflow-providers-google",
"sqlalchemy-bigquery==1.3.0",
"smart-open[all]>=5.2.1",
"apache-airflow-providers-snowflake",
"snowflake-sqlalchemy ==1.2.0",
"snowflake-connector-python[pandas]",
"apache-airflow-providers-postgres",
"s3fs",
"smart-open[s3]>=5.2.1",
]

[project.urls]
Expand Down
1 change: 1 addition & 0 deletions src/astro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""A decorator that allows users to run SQL queries natively in Airflow."""

__version__ = "0.5.1"

from astro.dataframe import dataframe


Expand Down
1 change: 1 addition & 0 deletions src/astro/constants.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DEFAULT_CHUNK_SIZE = 1000000
PYPI_PROJECT_NAME = "astro-projects"
7 changes: 2 additions & 5 deletions src/astro/sql/operators/agnostic_save_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@
import pandas as pd
from airflow.hooks.base import BaseHook
from airflow.models import BaseOperator, DagRun, TaskInstance
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from google.cloud.storage import Client
from smart_open import open

from astro.sql.operators.temp_hooks import TempSnowflakeHook
from astro.sql.table import Table
from astro.utils.cloud_storage_creds import gcs_client, s3fs_creds
from astro.utils.dependencies import BigQueryHook, PostgresHook, SnowflakeHook
from astro.utils.schema_util import get_schema, get_table_name
from astro.utils.task_id_helper import get_task_id

Expand Down Expand Up @@ -88,7 +85,7 @@ def execute(self, context):
"postgres": PostgresHook(
postgres_conn_id=input_table.conn_id, schema=input_table.database
),
"snowflake": TempSnowflakeHook(
"snowflake": SnowflakeHook(
snowflake_conn_id=input_table.conn_id,
database=input_table.database,
schema=input_table.schema,
Expand Down
3 changes: 1 addition & 2 deletions src/astro/sql/operators/sql_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
import pandas as pd
from airflow.decorators.base import DecoratedOperator
from airflow.hooks.base import BaseHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook

from astro.constants import DEFAULT_CHUNK_SIZE
from astro.sql.table import Table, TempTable, create_table_name
from astro.utils.dependencies import PostgresHook, SnowflakeHook
from astro.utils.load_dataframe import move_dataframe_to_sql
from astro.utils.schema_util import get_schema

Expand Down
4 changes: 1 addition & 3 deletions src/astro/sql/operators/sql_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@
from airflow.decorators.base import DecoratedOperator, task_decorator_factory
from airflow.hooks.base import BaseHook
from airflow.models import DagRun, TaskInstance
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.utils.db import provide_session
from sqlalchemy import text
from sqlalchemy.sql.functions import Function

from astro.sql.table import Table, create_table_name
from astro.utils import postgres_transform, snowflake_transform
from astro.utils.dependencies import BigQueryHook, PostgresHook, SnowflakeHook
from astro.utils.load_dataframe import move_dataframe_to_sql
from astro.utils.schema_util import get_schema, set_schema_query

Expand Down
34 changes: 0 additions & 34 deletions src/astro/sql/operators/temp_hooks.py

This file was deleted.

3 changes: 0 additions & 3 deletions src/astro/utils/bigquery_merge_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
from airflow.providers.postgres.hooks.postgres import PostgresHook
from psycopg2 import sql

from astro.sql.table import Table


Expand Down
51 changes: 51 additions & 0 deletions src/astro/utils/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Copyright Astronomer, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from astro import constants


class MissingPackage(object):
def __init__(self, module_name, related_extras):
self.module_name = module_name
self.related_extras = related_extras

def __getattr__(self, item):
raise RuntimeError(
f"Error loading the module {self.module_name},"
f" please make sure all the dependencies are installed."
f" try - pip install {constants.PYPI_PROJECT_NAME}[{self.related_extras}]"
)


try:
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
except ModuleNotFoundError:
BigQueryHook = MissingPackage(
"airflow.providers.google.cloud.hooks.bigquery", "google"
)

try:
from airflow.providers.postgres.hooks.postgres import PostgresHook
except ModuleNotFoundError:
PostgresHook = MissingPackage(
"airflow.providers.postgres.hooks.postgres", "postgres"
)

try:
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
except ModuleNotFoundError:
SnowflakeHook = MissingPackage(
"airflow.providers.snowflake.hooks.snowflake", "snowflake"
)
8 changes: 3 additions & 5 deletions src/astro/utils/load_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
"""
from typing import Optional, Union

from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from pandas import DataFrame
from pandas.io.sql import SQLDatabase
from snowflake.connector import pandas_tools

from astro.sql.operators.temp_hooks import TempSnowflakeHook
from astro.utils.dependencies import BigQueryHook, PostgresHook, SnowflakeHook
from astro.utils.schema_util import set_schema_query


Expand All @@ -37,10 +35,10 @@ def move_dataframe_to_sql(
chunksize,
):
# Select database Hook based on `conn` type
hook: Union[PostgresHook, TempSnowflakeHook] = { # type: ignore
hook: Union[PostgresHook, SnowflakeHook] = { # type: ignore
"postgresql": PostgresHook(postgres_conn_id=conn_id, schema=database),
"postgres": PostgresHook(postgres_conn_id=conn_id, schema=database),
"snowflake": TempSnowflakeHook(
"snowflake": SnowflakeHook(
snowflake_conn_id=conn_id,
database=database,
schema=schema,
Expand Down
2 changes: 1 addition & 1 deletion src/astro/utils/postgres_merge_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
from airflow.providers.postgres.hooks.postgres import PostgresHook
from psycopg2 import sql

from astro.sql.table import Table
from astro.utils.dependencies import PostgresHook


def postgres_merge_func(
Expand Down
3 changes: 1 addition & 2 deletions src/astro/utils/postgres_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
limitations under the License.
"""

from airflow.providers.postgres.hooks.postgres import PostgresHook

from astro.sql.table import Table
from astro.utils.dependencies import PostgresHook


def add_templates_to_context(parameters, context):
Expand Down
2 changes: 1 addition & 1 deletion src/astro/utils/schema_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
from typing import List

from airflow.hooks.base import BaseHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from psycopg2 import sql

from astro.sql.table import Table
from astro.utils.dependencies import PostgresHook


def set_schema_query(conn_type, hook, schema_id, user):
Expand Down
Loading

0 comments on commit f9eebef

Please sign in to comment.