Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Major update dependencies #129

Merged
merged 5 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0
rev: v4.5.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
Expand All @@ -12,21 +12,16 @@ repos:
- id: name-tests-test
args: [--pytest-test-first]
- id: requirements-txt-fixer
- repo: https://github.com/pycqa/flake8
rev: 5.0.4
hooks:
- id: flake8
args: ["--statistics", "--count", "--max-complexity=10", "--max-line-length=120", "--per-file-ignore=__init__.py: F401"]
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
args: [--line-length=120]
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black", --line-length=120]
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.4.4
hooks:
# Run the linter.
- id: ruff
args: [ --fix ]
# Run the formatter.
- id: ruff-format
args: [ --line-length=120 ]
- repo: local
hooks:
- id: check-requirements
Expand Down
44 changes: 44 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,47 @@ Changelog
# 0.5.2

- Removed the required environment variables `ls_sql_name` and `ls_blob_name` since it's standard.

# 0.5.3

- Added `dtype` argument, so users can specify their own SqlAlchemy dtype for certain columns.

# 0.5.4

- Set requirements to specific version

# 0.6.0

- Always create linked services for blob and sql. This way the user can switch source blob storages and sink databases easier.
- Use environment variable AZURE_STORAGE_CONNECTION_STRING for parquet upload
- If parquet upload is a single file, place it in the root of the folder

# 0.7.0

- Add upsert for parquet files
- Logging level to debug for query
- Fix bugs and add requirements checks
- Fix bug for staging schema with upsert
- Add support all pandas int dtypes
- Add customisable container name

# 0.8.0

- Upgrade dependency packages
- Fix failing pipeline because of removed staging schema

# 0.9.0

- Add more dtypes
- Upgrade package version
- Fix bug when dataframe is empty

# 0.9.1

- Fix bug categorical dtype
- Make pyodbc driver dynamic

# 1.0.0

- Upgrade packages and set minimal versions
- Fix code to work with upgraded packages
4 changes: 2 additions & 2 deletions df_to_azure/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging

from .export import df_to_azure
from .export import df_to_azure as df_to_azure

__version__ = "0.9.1"
__version__ = "1.0.0"

logging.basicConfig(
format="%(asctime)s.%(msecs)03d [%(levelname)-5s] [%(name)s] - %(message)s",
Expand Down
14 changes: 7 additions & 7 deletions df_to_azure/adf.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def create_linked_service_blob(self):
def create_input_blob(self):
ds_name = f"BLOB_dftoazure_{self.table_name}"

ds_ls = LinkedServiceReference(reference_name=self.ls_blob_name)
ds_ls = LinkedServiceReference(type="LinkedServiceReference", reference_name=self.ls_blob_name)
ds_azure_blob = AzureBlobDataset(
linked_service_name=ds_ls,
folder_path=f"dftoazure/{self.table_name}",
Expand All @@ -192,10 +192,9 @@ def create_input_blob(self):
self.adf_client.datasets.create_or_update(self.rg_name, self.df_name, ds_name, ds_azure_blob)

def create_output_sql(self):

ds_name = f"SQL_dftoazure_{self.table_name}"

ds_ls = LinkedServiceReference(reference_name=self.ls_sql_name)
ds_ls = LinkedServiceReference(type="LinkedServiceReference", reference_name=self.ls_sql_name)
data_azure_sql = AzureSqlTableDataset(
linked_service_name=ds_ls,
table_name=f"{self.schema}.{self.table_name}",
Expand All @@ -204,7 +203,6 @@ def create_output_sql(self):
self.adf_client.datasets.create_or_update(self.rg_name, self.df_name, ds_name, data_azure_sql)

def create_pipeline(self, pipeline_name):

activities = [self.create_copy_activity()]
# If user wants to upsert, we append stored procedure activity to pipeline.
if self.method == "upsert":
Expand All @@ -226,8 +224,8 @@ def create_copy_activity(self):
blob_source = BlobSource()
sql_sink = SqlSink()

ds_in_ref = DatasetReference(reference_name=f"BLOB_dftoazure_{self.table_name}")
ds_out_ref = DatasetReference(reference_name=f"SQL_dftoazure_{self.table_name}")
ds_in_ref = DatasetReference(type="DatasetReference", reference_name=f"BLOB_dftoazure_{self.table_name}")
ds_out_ref = DatasetReference(type="DatasetReference", reference_name=f"SQL_dftoazure_{self.table_name}")
copy_activity = CopyActivity(
name=act_name,
inputs=[ds_in_ref],
Expand All @@ -243,7 +241,9 @@ def stored_procedure_activity(self):
dependency = ActivityDependency(
activity=f"Copy {self.table_name} to SQL", dependency_conditions=[dependency_condition]
)
linked_service_reference = LinkedServiceReference(reference_name=self.ls_sql_name)
linked_service_reference = LinkedServiceReference(
type="LinkedServiceReference", reference_name=self.ls_sql_name
)
activity = SqlServerStoredProcedureActivity(
stored_procedure_name=f"UPSERT_{self.table_name}",
name="UPSERT procedure",
Expand Down
5 changes: 2 additions & 3 deletions df_to_azure/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ def create_merge_query(self):
"""
logging.debug(query)

return query
return text(query)

def drop_procedure(self):
query = f"DROP PROCEDURE IF EXISTS [UPSERT_{self.table_name}];"
return query
return text(query)

def create_stored_procedure(self):
with auth_azure() as con:
Expand All @@ -73,7 +73,6 @@ def create_stored_procedure(self):


def auth_azure(driver: str = None):

if driver is None:
import pyodbc

Expand Down
9 changes: 2 additions & 7 deletions df_to_azure/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
from azure.storage.blob import BlobServiceClient
from pandas import CategoricalDtype, DataFrame
from pandas.api.types import is_bool_dtype, is_datetime64_any_dtype, is_float_dtype, is_integer_dtype, is_string_dtype
from sqlalchemy.sql.visitors import VisitableType
from sqlalchemy.types import BigInteger, Boolean, DateTime, Integer, Numeric, String
from sqlalchemy.types import BigInteger, Boolean, DateTime, Integer, Numeric, String, TypeEngine

from df_to_azure.adf import ADF
from df_to_azure.db import SqlUpsert, auth_azure, execute_stmt
Expand All @@ -34,7 +33,6 @@ def df_to_azure(
clean_staging=True,
container_name="parquet",
):

if parquet:
DfToParquet(
df=df,
Expand Down Expand Up @@ -96,13 +94,11 @@ def __init__(
self.clean_staging = clean_staging

def run(self):

if self.df.empty:
logging.info("Data empty, no new records to upload.")
return None, None

if self.create:

# azure components
self.create_resourcegroup()
self.create_datafactory()
Expand Down Expand Up @@ -133,11 +129,10 @@ def run(self):

def _checks(self):
if self.dtypes:
if not all([type(given_type) == VisitableType for given_type in self.dtypes.keys()]):
if not all([type(given_type) == TypeEngine for given_type in self.dtypes.keys()]):
WrongDtypeError("Wrong dtype given, only SqlAlchemy types are accepted")

def upload_dataset(self):

if self.method == "create":
self.create_schema()
self.push_to_azure()
Expand Down
1 change: 0 additions & 1 deletion df_to_azure/tests/test_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# #### APPEND METHOD TESTS ####
# #############################
def test_append():

df = DataFrame({"A": [1, 2, 3], "B": list("abc"), "C": [4.0, 5.0, nan]})

# 1. we create a new dataframe
Expand Down
3 changes: 2 additions & 1 deletion df_to_azure/tests/test_zz_clean_up.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from df_to_azure.db import auth_azure
from sqlalchemy.sql import text


# --- CLEAN UP ----
Expand Down Expand Up @@ -36,5 +37,5 @@ def test_clean_up_db():
with con.begin():
for schema, tables in tables_dict.items():
for table in tables:
query = f"DROP TABLE IF EXISTS {schema}.{table};"
query = text(f"DROP TABLE IF EXISTS {schema}.{table};")
con.execute(query)
16 changes: 8 additions & 8 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
azure-identity>=1.7.1
azure-mgmt-datafactory>=2.2.0,<2.7.0
azure-mgmt-resource>=20.1.0
azure-storage-blob>=12.8.1
pandas>=1.5.0
pyarrow>=7.0.0
pyodbc>=4.0.32
sqlalchemy>=1.4.31,<2.0.0
azure-identity>=1.12.0
azure-mgmt-datafactory>=7.1.0
azure-mgmt-resource>=23.1.1
azure-storage-blob>=12.20.0
pandas>=2.2.2
pyarrow>=16.1.0
pyodbc>=5.1.0
sqlalchemy>=2.0.30
2 changes: 2 additions & 0 deletions ruff.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Set the maximum line length to 120.
line-length = 120
21 changes: 11 additions & 10 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = df_to_azure
version = 0.9.1
version = 1.0.0
author = Zypp
author_email = hello@zypp.io
description = Automatically write pandas DataFrames to SQL by creating pipelines in Azure Data Factory with copy activity from blob to SQL
Expand All @@ -18,16 +18,17 @@ classifiers =

[options]
packages = df_to_azure
python_requires = >=3.7
python_requires = >=3.9
install_requires =
azure-identity>=1.7.1
azure-mgmt-datafactory>=2.2.0,<2.7.0
azure-mgmt-resource>=20.1.0
azure-storage-blob>=12.8.1
pandas>=1.5.0
pyarrow>=7.0.0
pyodbc>=4.0.32
sqlalchemy>=1.4.31,<2.0.0
azure-identity>=1.12.0
azure-mgmt-datafactory>=7.1.0
azure-mgmt-resource>=23.1.1
azure-storage-blob>=12.20.0
pandas>=2.2.2
pyarrow>=16.1.0
pyodbc>=5.1.0
sqlalchemy>=2.0.30




Expand Down