Skip to content

Commit

Permalink
Merge pull request #131 from zypp-io/development
Browse files Browse the repository at this point in the history
Release 1.0.0
  • Loading branch information
erfannariman authored Jun 5, 2024
2 parents 715aac3 + 349d1e9 commit 4773b29
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 67 deletions.
27 changes: 11 additions & 16 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0
rev: v4.5.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
Expand All @@ -12,21 +12,16 @@ repos:
- id: name-tests-test
args: [--pytest-test-first]
- id: requirements-txt-fixer
- repo: https://github.com/pycqa/flake8
rev: 5.0.4
hooks:
- id: flake8
args: ["--statistics", "--count", "--max-complexity=10", "--max-line-length=120", "--per-file-ignore=__init__.py: F401"]
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
args: [--line-length=120]
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black", --line-length=120]
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.4.4
hooks:
# Run the linter.
- id: ruff
args: [ --fix ]
# Run the formatter.
- id: ruff-format
args: [ --line-length=120 ]
- repo: local
hooks:
- id: check-requirements
Expand Down
45 changes: 45 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,48 @@ Changelog
# 0.5.2

- Removed the required environment variables `ls_sql_name` and `ls_blob_name` since it's standard.

# 0.5.3

- Added `dtype` argument, so users can specify their own SqlAlchemy dtype for certain columns.

# 0.5.4

- Set requirements to specific version

# 0.6.0

- Always create linked services for blob and sql. This way the user can switch source blob storages and sink databases easier.
- Use environment variable AZURE_STORAGE_CONNECTION_STRING for parquet upload
- If parquet upload is a single file, place it in the root of the folder

# 0.7.0

- Add upsert for parquet files
- Logging level to debug for query
- Fix bugs and add requirements checks
- Fix bug for staging schema with upsert
- Add support all pandas int dtypes
- Add customisable container name

# 0.8.0

- Upgrade dependency packages
- Fix failing pipeline because of removed staging schema

# 0.9.0

- Add more dtypes
- Upgrade package version
- Fix bug when dataframe is empty

# 0.9.1

- Fix bug categorical dtype
- Make pyodbc driver dynamic

# 1.0.0

- Upgrade packages and set minimal versions
- Fix code to work with upgraded packages
- Export to parquet on storage instead of csv
4 changes: 2 additions & 2 deletions df_to_azure/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging

from .export import df_to_azure
from .export import df_to_azure as df_to_azure

__version__ = "0.9.1"
__version__ = "1.0.0"

logging.basicConfig(
format="%(asctime)s.%(msecs)03d [%(levelname)-5s] [%(name)s] - %(message)s",
Expand Down
27 changes: 10 additions & 17 deletions df_to_azure/adf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
Factory,
LinkedServiceReference,
LinkedServiceResource,
ParquetFormat,
PipelineResource,
SecureString,
SqlServerStoredProcedureActivity,
Expand Down Expand Up @@ -173,29 +174,20 @@ def create_linked_service_blob(self):
def create_input_blob(self):
ds_name = f"BLOB_dftoazure_{self.table_name}"

ds_ls = LinkedServiceReference(reference_name=self.ls_blob_name)
ds_ls = LinkedServiceReference(type="LinkedServiceReference", reference_name=self.ls_blob_name)
ds_azure_blob = AzureBlobDataset(
linked_service_name=ds_ls,
folder_path=f"dftoazure/{self.table_name}",
file_name=f"{self.table_name}.csv",
format={
"type": "TextFormat",
"columnDelimiter": "^",
"rowDelimiter": "\n",
"treatEmptyAsNull": "true",
"skipLineCount": 0,
"firstRowAsHeader": "true",
"quoteChar": '"',
},
file_name=f"{self.table_name}.parquet",
format=ParquetFormat(),
)
ds_azure_blob = DatasetResource(properties=ds_azure_blob)
self.adf_client.datasets.create_or_update(self.rg_name, self.df_name, ds_name, ds_azure_blob)

def create_output_sql(self):

ds_name = f"SQL_dftoazure_{self.table_name}"

ds_ls = LinkedServiceReference(reference_name=self.ls_sql_name)
ds_ls = LinkedServiceReference(type="LinkedServiceReference", reference_name=self.ls_sql_name)
data_azure_sql = AzureSqlTableDataset(
linked_service_name=ds_ls,
table_name=f"{self.schema}.{self.table_name}",
Expand All @@ -204,7 +196,6 @@ def create_output_sql(self):
self.adf_client.datasets.create_or_update(self.rg_name, self.df_name, ds_name, data_azure_sql)

def create_pipeline(self, pipeline_name):

activities = [self.create_copy_activity()]
# If user wants to upsert, we append stored procedure activity to pipeline.
if self.method == "upsert":
Expand All @@ -226,8 +217,8 @@ def create_copy_activity(self):
blob_source = BlobSource()
sql_sink = SqlSink()

ds_in_ref = DatasetReference(reference_name=f"BLOB_dftoazure_{self.table_name}")
ds_out_ref = DatasetReference(reference_name=f"SQL_dftoazure_{self.table_name}")
ds_in_ref = DatasetReference(type="DatasetReference", reference_name=f"BLOB_dftoazure_{self.table_name}")
ds_out_ref = DatasetReference(type="DatasetReference", reference_name=f"SQL_dftoazure_{self.table_name}")
copy_activity = CopyActivity(
name=act_name,
inputs=[ds_in_ref],
Expand All @@ -243,7 +234,9 @@ def stored_procedure_activity(self):
dependency = ActivityDependency(
activity=f"Copy {self.table_name} to SQL", dependency_conditions=[dependency_condition]
)
linked_service_reference = LinkedServiceReference(reference_name=self.ls_sql_name)
linked_service_reference = LinkedServiceReference(
type="LinkedServiceReference", reference_name=self.ls_sql_name
)
activity = SqlServerStoredProcedureActivity(
stored_procedure_name=f"UPSERT_{self.table_name}",
name="UPSERT procedure",
Expand Down
5 changes: 2 additions & 3 deletions df_to_azure/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ def create_merge_query(self):
"""
logging.debug(query)

return query
return text(query)

def drop_procedure(self):
query = f"DROP PROCEDURE IF EXISTS [UPSERT_{self.table_name}];"
return query
return text(query)

def create_stored_procedure(self):
with auth_azure() as con:
Expand All @@ -73,7 +73,6 @@ def create_stored_procedure(self):


def auth_azure(driver: str = None):

if driver is None:
import pyodbc

Expand Down
21 changes: 12 additions & 9 deletions df_to_azure/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
from azure.storage.blob import BlobServiceClient
from pandas import CategoricalDtype, DataFrame
from pandas.api.types import is_bool_dtype, is_datetime64_any_dtype, is_float_dtype, is_integer_dtype, is_string_dtype
from sqlalchemy.sql.visitors import VisitableType
from sqlalchemy.types import BigInteger, Boolean, DateTime, Integer, Numeric, String
from sqlalchemy.types import BigInteger, Boolean, DateTime, Integer, Numeric, String, TypeEngine

from df_to_azure.adf import ADF
from df_to_azure.db import SqlUpsert, auth_azure, execute_stmt
Expand All @@ -34,7 +33,6 @@ def df_to_azure(
clean_staging=True,
container_name="parquet",
):

if parquet:
DfToParquet(
df=df,
Expand Down Expand Up @@ -96,13 +94,11 @@ def __init__(
self.clean_staging = clean_staging

def run(self):

if self.df.empty:
logging.info("Data empty, no new records to upload.")
return None, None

if self.create:

# azure components
self.create_resourcegroup()
self.create_datafactory()
Expand Down Expand Up @@ -133,11 +129,10 @@ def run(self):

def _checks(self):
if self.dtypes:
if not all([type(given_type) == VisitableType for given_type in self.dtypes.keys()]):
if not all([type(given_type) == TypeEngine for given_type in self.dtypes.keys()]):
WrongDtypeError("Wrong dtype given, only SqlAlchemy types are accepted")

def upload_dataset(self):

if self.method == "create":
self.create_schema()
self.push_to_azure()
Expand Down Expand Up @@ -183,10 +178,18 @@ def upload_to_blob(self):
blob_client = self.blob_service_client()
blob_client = blob_client.get_blob_client(
container="dftoazure",
blob=f"{self.table_name}/{self.table_name}.csv",
blob=f"{self.table_name}/{self.table_name}.parquet",
)

data = self.df.to_csv(index=False, sep="^", quotechar='"', lineterminator="\n")
# This is needed because ADF converts datetime to Unix Epoch
# resulting in INT64 type,
# which conflicts with our Datetime column in the database
# https://shorturl.at/dtSm6
datetime_dtypes = self.df.select_dtypes("datetime")
if datetime_dtypes.empty is False:
for col in datetime_dtypes.columns:
self.df[col] = self.df[col].astype(str)
data = self.df.to_parquet(index=False)
blob_client.upload_blob(data, overwrite=True)

def create_schema(self):
Expand Down
1 change: 0 additions & 1 deletion df_to_azure/tests/test_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# #### APPEND METHOD TESTS ####
# #############################
def test_append():

df = DataFrame({"A": [1, 2, 3], "B": list("abc"), "C": [4.0, 5.0, nan]})

# 1. we create a new dataframe
Expand Down
3 changes: 2 additions & 1 deletion df_to_azure/tests/test_zz_clean_up.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from df_to_azure.db import auth_azure
from sqlalchemy.sql import text


# --- CLEAN UP ----
Expand Down Expand Up @@ -36,5 +37,5 @@ def test_clean_up_db():
with con.begin():
for schema, tables in tables_dict.items():
for table in tables:
query = f"DROP TABLE IF EXISTS {schema}.{table};"
query = text(f"DROP TABLE IF EXISTS {schema}.{table};")
con.execute(query)
16 changes: 8 additions & 8 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
azure-identity>=1.7.1
azure-mgmt-datafactory>=2.2.0,<2.7.0
azure-mgmt-resource>=20.1.0
azure-storage-blob>=12.8.1
pandas>=1.5.0
pyarrow>=7.0.0
pyodbc>=4.0.32
sqlalchemy>=1.4.31,<2.0.0
azure-identity>=1.12.0
azure-mgmt-datafactory>=7.1.0
azure-mgmt-resource>=23.1.1
azure-storage-blob>=12.20.0
pandas>=2.2.2
pyarrow>=16.1.0
pyodbc>=5.1.0
sqlalchemy>=2.0.30
2 changes: 2 additions & 0 deletions ruff.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Set the maximum line length to 120.
line-length = 120
21 changes: 11 additions & 10 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = df_to_azure
version = 0.9.1
version = 1.0.0
author = Zypp
author_email = hello@zypp.io
description = Automatically write pandas DataFrames to SQL by creating pipelines in Azure Data Factory with copy activity from blob to SQL
Expand All @@ -18,16 +18,17 @@ classifiers =

[options]
packages = df_to_azure
python_requires = >=3.7
python_requires = >=3.9
install_requires =
azure-identity>=1.7.1
azure-mgmt-datafactory>=2.2.0,<2.7.0
azure-mgmt-resource>=20.1.0
azure-storage-blob>=12.8.1
pandas>=1.5.0
pyarrow>=7.0.0
pyodbc>=4.0.32
sqlalchemy>=1.4.31,<2.0.0
azure-identity>=1.12.0
azure-mgmt-datafactory>=7.1.0
azure-mgmt-resource>=23.1.1
azure-storage-blob>=12.20.0
pandas>=2.2.2
pyarrow>=16.1.0
pyodbc>=5.1.0
sqlalchemy>=2.0.30




Expand Down

0 comments on commit 4773b29

Please sign in to comment.