diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 06a1530..34086c7 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.3.0 + rev: v4.5.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer @@ -12,21 +12,16 @@ repos: - id: name-tests-test args: [--pytest-test-first] - id: requirements-txt-fixer -- repo: https://github.com/pycqa/flake8 - rev: 5.0.4 - hooks: - - id: flake8 - args: ["--statistics", "--count", "--max-complexity=10", "--max-line-length=120", "--per-file-ignore=__init__.py: F401"] -- repo: https://github.com/psf/black - rev: 22.3.0 - hooks: - - id: black - args: [--line-length=120] -- repo: https://github.com/PyCQA/isort - rev: 5.12.0 - hooks: - - id: isort - args: ["--profile", "black", --line-length=120] +- repo: https://github.com/astral-sh/ruff-pre-commit + # Ruff version. + rev: v0.4.4 + hooks: + # Run the linter. + - id: ruff + args: [ --fix ] + # Run the formatter. + - id: ruff-format + args: [ --line-length=120 ] - repo: local hooks: - id: check-requirements diff --git a/CHANGELOG.md b/CHANGELOG.md index 63aa291..6a0d87b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,3 +15,47 @@ Changelog # 0.5.2 - Removed the required environment variables `ls_sql_name` and `ls_blob_name` since it's standard. + +# 0.5.3 + +- Added `dtype` argument, so users can specify their own SqlAlchemy dtype for certain columns. + +# 0.5.4 + +- Set requirements to specific version + +# 0.6.0 + +- Always create linked services for blob and sql. This way the user can switch source blob storages and sink databases easier. +- Use environment variable AZURE_STORAGE_CONNECTION_STRING for parquet upload +- If parquet upload is a single file, place it in the root of the folder + +# 0.7.0 + +- Add upsert for parquet files +- Logging level to debug for query +- Fix bugs and add requirements checks +- Fix bug for staging schema with upsert +- Add support all pandas int dtypes +- Add customisable container name + +# 0.8.0 + +- Upgrade dependency packages +- Fix failing pipeline because of removed staging schema + +# 0.9.0 + +- Add more dtypes +- Upgrade package version +- Fix bug when dataframe is empty + +# 0.9.1 + +- Fix bug categorical dtype +- Make pyodbc driver dynamic + +# 1.0.0 + +- Upgrade packages and set minimal versions +- Fix code to work with upgraded packages diff --git a/df_to_azure/__init__.py b/df_to_azure/__init__.py index 72f93b0..d376ed7 100644 --- a/df_to_azure/__init__.py +++ b/df_to_azure/__init__.py @@ -1,8 +1,8 @@ import logging -from .export import df_to_azure +from .export import df_to_azure as df_to_azure -__version__ = "0.9.1" +__version__ = "1.0.0" logging.basicConfig( format="%(asctime)s.%(msecs)03d [%(levelname)-5s] [%(name)s] - %(message)s", diff --git a/df_to_azure/adf.py b/df_to_azure/adf.py index 884dbd7..fd6b889 100644 --- a/df_to_azure/adf.py +++ b/df_to_azure/adf.py @@ -174,7 +174,7 @@ def create_linked_service_blob(self): def create_input_blob(self): ds_name = f"BLOB_dftoazure_{self.table_name}" - ds_ls = LinkedServiceReference(reference_name=self.ls_blob_name) + ds_ls = LinkedServiceReference(type="LinkedServiceReference", reference_name=self.ls_blob_name) ds_azure_blob = AzureBlobDataset( linked_service_name=ds_ls, folder_path=f"dftoazure/{self.table_name}", @@ -185,10 +185,9 @@ def create_input_blob(self): self.adf_client.datasets.create_or_update(self.rg_name, self.df_name, ds_name, ds_azure_blob) def create_output_sql(self): - ds_name = f"SQL_dftoazure_{self.table_name}" - ds_ls = LinkedServiceReference(reference_name=self.ls_sql_name) + ds_ls = LinkedServiceReference(type="LinkedServiceReference", reference_name=self.ls_sql_name) data_azure_sql = AzureSqlTableDataset( linked_service_name=ds_ls, table_name=f"{self.schema}.{self.table_name}", @@ -197,7 +196,6 @@ def create_output_sql(self): self.adf_client.datasets.create_or_update(self.rg_name, self.df_name, ds_name, data_azure_sql) def create_pipeline(self, pipeline_name): - activities = [self.create_copy_activity()] # If user wants to upsert, we append stored procedure activity to pipeline. if self.method == "upsert": @@ -219,8 +217,8 @@ def create_copy_activity(self): blob_source = BlobSource() sql_sink = SqlSink() - ds_in_ref = DatasetReference(reference_name=f"BLOB_dftoazure_{self.table_name}") - ds_out_ref = DatasetReference(reference_name=f"SQL_dftoazure_{self.table_name}") + ds_in_ref = DatasetReference(type="DatasetReference", reference_name=f"BLOB_dftoazure_{self.table_name}") + ds_out_ref = DatasetReference(type="DatasetReference", reference_name=f"SQL_dftoazure_{self.table_name}") copy_activity = CopyActivity( name=act_name, inputs=[ds_in_ref], @@ -236,7 +234,9 @@ def stored_procedure_activity(self): dependency = ActivityDependency( activity=f"Copy {self.table_name} to SQL", dependency_conditions=[dependency_condition] ) - linked_service_reference = LinkedServiceReference(reference_name=self.ls_sql_name) + linked_service_reference = LinkedServiceReference( + type="LinkedServiceReference", reference_name=self.ls_sql_name + ) activity = SqlServerStoredProcedureActivity( stored_procedure_name=f"UPSERT_{self.table_name}", name="UPSERT procedure", diff --git a/df_to_azure/db.py b/df_to_azure/db.py index 46157b8..6dc890d 100644 --- a/df_to_azure/db.py +++ b/df_to_azure/db.py @@ -49,11 +49,11 @@ def create_merge_query(self): """ logging.debug(query) - return query + return text(query) def drop_procedure(self): query = f"DROP PROCEDURE IF EXISTS [UPSERT_{self.table_name}];" - return query + return text(query) def create_stored_procedure(self): with auth_azure() as con: @@ -73,7 +73,6 @@ def create_stored_procedure(self): def auth_azure(driver: str = None): - if driver is None: import pyodbc diff --git a/df_to_azure/export.py b/df_to_azure/export.py index 2ac478d..6309886 100644 --- a/df_to_azure/export.py +++ b/df_to_azure/export.py @@ -9,8 +9,7 @@ from azure.storage.blob import BlobServiceClient from pandas import CategoricalDtype, DataFrame from pandas.api.types import is_bool_dtype, is_datetime64_any_dtype, is_float_dtype, is_integer_dtype, is_string_dtype -from sqlalchemy.sql.visitors import VisitableType -from sqlalchemy.types import BigInteger, Boolean, DateTime, Integer, Numeric, String +from sqlalchemy.types import BigInteger, Boolean, DateTime, Integer, Numeric, String, TypeEngine from df_to_azure.adf import ADF from df_to_azure.db import SqlUpsert, auth_azure, execute_stmt @@ -34,7 +33,6 @@ def df_to_azure( clean_staging=True, container_name="parquet", ): - if parquet: DfToParquet( df=df, @@ -96,13 +94,11 @@ def __init__( self.clean_staging = clean_staging def run(self): - if self.df.empty: logging.info("Data empty, no new records to upload.") return None, None if self.create: - # azure components self.create_resourcegroup() self.create_datafactory() @@ -133,11 +129,10 @@ def run(self): def _checks(self): if self.dtypes: - if not all([type(given_type) == VisitableType for given_type in self.dtypes.keys()]): + if not all([type(given_type) == TypeEngine for given_type in self.dtypes.keys()]): WrongDtypeError("Wrong dtype given, only SqlAlchemy types are accepted") def upload_dataset(self): - if self.method == "create": self.create_schema() self.push_to_azure() diff --git a/df_to_azure/tests/test_append.py b/df_to_azure/tests/test_append.py index 1a46e7d..bb76a7f 100644 --- a/df_to_azure/tests/test_append.py +++ b/df_to_azure/tests/test_append.py @@ -10,7 +10,6 @@ # #### APPEND METHOD TESTS #### # ############################# def test_append(): - df = DataFrame({"A": [1, 2, 3], "B": list("abc"), "C": [4.0, 5.0, nan]}) # 1. we create a new dataframe diff --git a/df_to_azure/tests/test_zz_clean_up.py b/df_to_azure/tests/test_zz_clean_up.py index 6ad3643..a1c666f 100644 --- a/df_to_azure/tests/test_zz_clean_up.py +++ b/df_to_azure/tests/test_zz_clean_up.py @@ -1,4 +1,5 @@ from df_to_azure.db import auth_azure +from sqlalchemy.sql import text # --- CLEAN UP ---- @@ -36,5 +37,5 @@ def test_clean_up_db(): with con.begin(): for schema, tables in tables_dict.items(): for table in tables: - query = f"DROP TABLE IF EXISTS {schema}.{table};" + query = text(f"DROP TABLE IF EXISTS {schema}.{table};") con.execute(query) diff --git a/requirements.txt b/requirements.txt index 6c846fb..56e8a07 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ -azure-identity>=1.7.1 -azure-mgmt-datafactory>=2.2.0,<2.7.0 -azure-mgmt-resource>=20.1.0 -azure-storage-blob>=12.8.1 -pandas>=1.5.0 -pyarrow>=7.0.0 -pyodbc>=4.0.32 -sqlalchemy>=1.4.31,<2.0.0 +azure-identity>=1.12.0 +azure-mgmt-datafactory>=7.1.0 +azure-mgmt-resource>=23.1.1 +azure-storage-blob>=12.20.0 +pandas>=2.2.2 +pyarrow>=16.1.0 +pyodbc>=5.1.0 +sqlalchemy>=2.0.30 diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 0000000..f467ae0 --- /dev/null +++ b/ruff.toml @@ -0,0 +1,2 @@ +# Set the maximum line length to 120. +line-length = 120 diff --git a/setup.cfg b/setup.cfg index d0f3924..aed4f49 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = df_to_azure -version = 0.9.1 +version = 1.0.0 author = Zypp author_email = hello@zypp.io description = Automatically write pandas DataFrames to SQL by creating pipelines in Azure Data Factory with copy activity from blob to SQL @@ -18,16 +18,17 @@ classifiers = [options] packages = df_to_azure -python_requires = >=3.7 +python_requires = >=3.9 install_requires = - azure-identity>=1.7.1 - azure-mgmt-datafactory>=2.2.0,<2.7.0 - azure-mgmt-resource>=20.1.0 - azure-storage-blob>=12.8.1 - pandas>=1.5.0 - pyarrow>=7.0.0 - pyodbc>=4.0.32 - sqlalchemy>=1.4.31,<2.0.0 + azure-identity>=1.12.0 + azure-mgmt-datafactory>=7.1.0 + azure-mgmt-resource>=23.1.1 + azure-storage-blob>=12.20.0 + pandas>=2.2.2 + pyarrow>=16.1.0 + pyodbc>=5.1.0 + sqlalchemy>=2.0.30 +