Skip to content

Commit

Permalink
Make current setup work with updated packages
Browse files Browse the repository at this point in the history
  • Loading branch information
TimvdHeijden committed May 28, 2024
1 parent 280d221 commit 90aa73d
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 11 deletions.
12 changes: 7 additions & 5 deletions df_to_azure/adf.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def create_linked_service_blob(self):
def create_input_blob(self):
ds_name = f"BLOB_dftoazure_{self.table_name}"

ds_ls = LinkedServiceReference(reference_name=self.ls_blob_name)
ds_ls = LinkedServiceReference(type="LinkedServiceReference", reference_name=self.ls_blob_name)
ds_azure_blob = AzureBlobDataset(
linked_service_name=ds_ls,
folder_path=f"dftoazure/{self.table_name}",
Expand All @@ -194,7 +194,7 @@ def create_input_blob(self):
def create_output_sql(self):
ds_name = f"SQL_dftoazure_{self.table_name}"

ds_ls = LinkedServiceReference(reference_name=self.ls_sql_name)
ds_ls = LinkedServiceReference(type="LinkedServiceReference", reference_name=self.ls_sql_name)
data_azure_sql = AzureSqlTableDataset(
linked_service_name=ds_ls,
table_name=f"{self.schema}.{self.table_name}",
Expand Down Expand Up @@ -224,8 +224,8 @@ def create_copy_activity(self):
blob_source = BlobSource()
sql_sink = SqlSink()

ds_in_ref = DatasetReference(reference_name=f"BLOB_dftoazure_{self.table_name}")
ds_out_ref = DatasetReference(reference_name=f"SQL_dftoazure_{self.table_name}")
ds_in_ref = DatasetReference(type="DatasetReference", reference_name=f"BLOB_dftoazure_{self.table_name}")
ds_out_ref = DatasetReference(type="DatasetReference", reference_name=f"SQL_dftoazure_{self.table_name}")
copy_activity = CopyActivity(
name=act_name,
inputs=[ds_in_ref],
Expand All @@ -241,7 +241,9 @@ def stored_procedure_activity(self):
dependency = ActivityDependency(
activity=f"Copy {self.table_name} to SQL", dependency_conditions=[dependency_condition]
)
linked_service_reference = LinkedServiceReference(reference_name=self.ls_sql_name)
linked_service_reference = LinkedServiceReference(
type="LinkedServiceReference", reference_name=self.ls_sql_name
)
activity = SqlServerStoredProcedureActivity(
stored_procedure_name=f"UPSERT_{self.table_name}",
name="UPSERT procedure",
Expand Down
4 changes: 2 additions & 2 deletions df_to_azure/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ def create_merge_query(self):
"""
logging.debug(query)

return query
return text(query)

def drop_procedure(self):
query = f"DROP PROCEDURE IF EXISTS [UPSERT_{self.table_name}];"
return query
return text(query)

def create_stored_procedure(self):
with auth_azure() as con:
Expand Down
5 changes: 2 additions & 3 deletions df_to_azure/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
from azure.storage.blob import BlobServiceClient
from pandas import CategoricalDtype, DataFrame
from pandas.api.types import is_bool_dtype, is_datetime64_any_dtype, is_float_dtype, is_integer_dtype, is_string_dtype
from sqlalchemy.sql.visitors import VisitableType
from sqlalchemy.types import BigInteger, Boolean, DateTime, Integer, Numeric, String
from sqlalchemy.types import BigInteger, Boolean, DateTime, Integer, Numeric, String, TypeEngine

from df_to_azure.adf import ADF
from df_to_azure.db import SqlUpsert, auth_azure, execute_stmt
Expand Down Expand Up @@ -130,7 +129,7 @@ def run(self):

def _checks(self):
if self.dtypes:
if not all([type(given_type) == VisitableType for given_type in self.dtypes.keys()]):
if not all([type(given_type) == TypeEngine for given_type in self.dtypes.keys()]):
WrongDtypeError("Wrong dtype given, only SqlAlchemy types are accepted")

def upload_dataset(self):
Expand Down
3 changes: 2 additions & 1 deletion df_to_azure/tests/test_zz_clean_up.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from df_to_azure.db import auth_azure
from sqlalchemy.sql import text


# --- CLEAN UP ----
Expand Down Expand Up @@ -36,5 +37,5 @@ def test_clean_up_db():
with con.begin():
for schema, tables in tables_dict.items():
for table in tables:
query = f"DROP TABLE IF EXISTS {schema}.{table};"
query = text(f"DROP TABLE IF EXISTS {schema}.{table};")
con.execute(query)

0 comments on commit 90aa73d

Please sign in to comment.