From c0f00526c34407bfd53c2c8b8e5b67586bee5670 Mon Sep 17 00:00:00 2001 From: Tim van der Heijden Date: Mon, 16 May 2022 15:13:44 +0200 Subject: [PATCH 1/3] Fix bug for staging schema with upsert --- .pre-commit-config.yaml | 2 +- data/salesfoce_opportunity_merged.pickle | Bin 0 -> 1478 bytes df_to_azure/db.py | 14 ++++- df_to_azure/exceptions.py | 6 ++ df_to_azure/export.py | 18 ++++++ df_to_azure/tests/test_upsert.py | 70 +++++++++++++++++++++++ 6 files changed, 107 insertions(+), 3 deletions(-) create mode 100644 data/salesfoce_opportunity_merged.pickle diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 905d643..1862b7d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,7 +11,7 @@ repos: hooks: - id: flake8 - repo: https://github.com/psf/black - rev: 21.6b0 + rev: 22.3.0 hooks: - id: black args: [--line-length=120] diff --git a/data/salesfoce_opportunity_merged.pickle b/data/salesfoce_opportunity_merged.pickle new file mode 100644 index 0000000000000000000000000000000000000000..e601a8dd0e99231f5fd936ca946b89fb2e375e20 GIT binary patch literal 1478 zcmZuxUr!T35O0x+6s?iOKLj-weXu^XX$vYu6D^<-tF4HFF)^3zb=zII+dKF6P!dfv z5!7VgSp6uz_$B-pehPJVZwoED$=&Si%+8;k-^}~m*B`lzS@KkbSO>*rFXXt~Jig$E-sYo}S9@Trn=ns= zp%22b2ysD?ug}l_-%?v&2i0+gU%Q3@@MTh{BH7a|mk=ap;*|z$mjW@E5d?UN7$XS`|i0ugL zOCzm0_2qLaKO(blRkPWA@CTOpHEu%$UtYr7Ojat_XP#GJRjcc-wyG=jMWqj`m6kc= zvu|3-N_`pJuO@MXAtafvRcf_TwN{#`8m2m5s#Ht0*+Y`4B`gs2IgTMa=V92Rv7mU< zxxb>C+QqfhZ)G8xfBbBv{zA!#1t^^J2tjFLh4mv%4m=WUF{d7=xm_Lx?WEhq9C9sw zdz=Dd|DUzHqn3_YI&Nv+(g{nCTYAFMf~AT$a)b^&cHtuv zq;ODnGHEMLW{Tg6*0Lt=KbGr;-S~AU<$oLcb8;~~&y??&2)Y9ikh$SP8vi*{{+rtW zppy|}J(D?N>8Pc)e=^4`ugHx?ZbuPgKqs)PG&If)qr8LP;JDmIF?3np)ev81a0Ue+ zPi3yZH~rRu9#rHEnbJ5V(|wel1m?0qU7;04Ar}dwV!K2;V6g6aVZwxAF0_Iz#Ci^8 z4o-M@C3YgroehMzv)Wo$a{54?aLf)#FFg$f^05=9Mi64aIzx&|I>So;dV+4fXHqS! z`Uw@3^37|RFA&}T3bc6YHW0PPV|Cv<*>6(dw+M1mHUzzrMCBMwX_1S^ykQW*v z{nCH8t)d~2?L&Y>sd215!1RQf0z*v~wLfIt09xBr`6bLJBn0#;47j6i*GE0GJ&Sxl Yd>y-;B=V^T9qsz4jbV^}j+1uzFI{i@SpWb4 literal 0 HcmV?d00001 diff --git a/df_to_azure/db.py b/df_to_azure/db.py index 1a311bd..d4aa862 100644 --- a/df_to_azure/db.py +++ b/df_to_azure/db.py @@ -2,8 +2,11 @@ import os from urllib.parse import quote_plus +import sqlalchemy.exc from sqlalchemy import create_engine +from df_to_azure.exceptions import UpsertError + class SqlUpsert: def __init__(self, table_name, schema, id_cols, columns): @@ -57,8 +60,15 @@ def create_stored_procedure(self): query_drop_procedure = self.drop_procedure() con.execute(query_drop_procedure) query_create_merge = self.create_merge_query() - con.execute(query_create_merge) - t.commit() + try: + con.execute(query_create_merge) + t.commit() + except sqlalchemy.exc.ProgrammingError: + raise UpsertError( + "During upsert there has been an issue. One of the sources could be that the table in" + " staging has columns that do not match the table you want to upsert. Remove the " + f"staging table {self.table_name} manually in that case" + ) def auth_azure(): diff --git a/df_to_azure/exceptions.py b/df_to_azure/exceptions.py index d0a23d3..121e88d 100644 --- a/df_to_azure/exceptions.py +++ b/df_to_azure/exceptions.py @@ -28,3 +28,9 @@ class DoubleColumnNamesError(Exception): """For writing to Azure we do not accept double column names""" pass + + +class UpsertError(Exception): + """For the moment upsert want to use a staging table that has the same name, but other columns""" + + pass diff --git a/df_to_azure/export.py b/df_to_azure/export.py index 6a60ec0..12a1314 100644 --- a/df_to_azure/export.py +++ b/df_to_azure/export.py @@ -31,6 +31,7 @@ def df_to_azure( create=False, dtypes=None, parquet=False, + clean_staging=True, ): if parquet: @@ -49,6 +50,7 @@ def df_to_azure( decimal_precision=decimal_precision, create=create, dtypes=dtypes, + clean_staging=clean_staging, ).run() return adf_client, run_response @@ -68,6 +70,7 @@ def __init__( decimal_precision: int = 2, create: bool = False, dtypes: dict = None, + clean_staging: bool = True, ): super().__init__( df=df, @@ -82,6 +85,7 @@ def __init__( self.text_length = text_length self.decimal_precision = decimal_precision self.dtypes = dtypes + self.clean_staging = clean_staging def run(self): if self.create: @@ -103,6 +107,10 @@ def run(self): run_response = self.create_pipeline(pipeline_name=self.pipeline_name) if self.wait_till_finished: wait_until_pipeline_is_done(self.adf_client, run_response) + if self.clean_staging & (self.method == "upsert"): + # If you used clean_staging=False before and the upsert gives errors on unknown columns -> remove table in + # staging manually + self.clean_staging_after_upsert() return self.adf_client, run_response @@ -254,6 +262,16 @@ def check_for_bigint(self): return update_dict_bigint + def clean_staging_after_upsert(self): + """ + Function to drop the table created in staging for the upsert. This function prevents issues with unmatchable + columns when doing upsert of different data with the same name. + """ + query = f""" + DROP TABLE IF EXISTS staging.{self.table_name} + """ + execute_stmt(query) + class DfToParquet: """ diff --git a/df_to_azure/tests/test_upsert.py b/df_to_azure/tests/test_upsert.py index fc6a1fe..975ba8f 100644 --- a/df_to_azure/tests/test_upsert.py +++ b/df_to_azure/tests/test_upsert.py @@ -6,6 +6,7 @@ from df_to_azure import df_to_azure from df_to_azure.db import auth_azure +from df_to_azure.exceptions import UpsertError from ..tests import data @@ -166,3 +167,72 @@ def test_upsert_spaces_column_name(file_dir="data"): result = read_sql_table(table_name="sample_spaces_column_name", con=con, schema="test") assert_frame_equal(expected, result) + + +@pytest.mark.parametrize("clean_staging", [True, False]) +def test_upsert_same_tablename(clean_staging): + """ + Test upsert with the same table name but different columns. Work with clean_staging=True and fails when staging + is not cleaned. + """ + df1 = data["sample_1"] + df_to_azure( + df=df1, + tablename="sample", + schema="test", + method="create", + wait_till_finished=True, + ) + + df2 = data["sample_2"] + df_to_azure( + df=df2, + tablename="sample", + schema="test", + method="upsert", + id_field="col_a", + wait_till_finished=True, + clean_staging=clean_staging, + ) + + new_col_names = {"col_a": "test_a", "col_b": "test_b", "col_c": "test_c"} + df1 = df1.rename(columns=new_col_names) + df2 = df2.rename(columns=new_col_names) + df_to_azure( + df=df1, + tablename="sample", + schema="test", + method="create", + wait_till_finished=True, + ) + if clean_staging: + df_to_azure( + df=df2, + tablename="sample", + schema="test", + method="upsert", + id_field="test_a", + wait_till_finished=True, + ) + expected = DataFrame( + { + "test_a": [1, 3, 4, 5, 6], + "test_b": ["updated value", "test", "test", "new value", "also new"], + "test_c": ["E", "Z", "A", "F", "H"], + } + ) + + with auth_azure() as con: + result = read_sql_table(table_name="sample", con=con, schema="test") + + assert_frame_equal(expected, result) + else: + with pytest.raises(UpsertError): + df_to_azure( + df=df2, + tablename="sample", + schema="test", + method="upsert", + id_field="test_a", + wait_till_finished=True, + ) From d0ee679426561efa7702eb44a1e7cc559d2953a0 Mon Sep 17 00:00:00 2001 From: Tim van der Heijden Date: Mon, 16 May 2022 15:15:36 +0200 Subject: [PATCH 2/3] Change import --- df_to_azure/db.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/df_to_azure/db.py b/df_to_azure/db.py index d4aa862..8abaddd 100644 --- a/df_to_azure/db.py +++ b/df_to_azure/db.py @@ -2,8 +2,8 @@ import os from urllib.parse import quote_plus -import sqlalchemy.exc from sqlalchemy import create_engine +from sqlalchemy.exc import ProgrammingError from df_to_azure.exceptions import UpsertError @@ -63,7 +63,7 @@ def create_stored_procedure(self): try: con.execute(query_create_merge) t.commit() - except sqlalchemy.exc.ProgrammingError: + except ProgrammingError: raise UpsertError( "During upsert there has been an issue. One of the sources could be that the table in" " staging has columns that do not match the table you want to upsert. Remove the " From 82b007933ab07972c392248ce895c94912aa96f0 Mon Sep 17 00:00:00 2001 From: Tim van der Heijden Date: Mon, 16 May 2022 15:16:47 +0200 Subject: [PATCH 3/3] Change exception --- df_to_azure/exceptions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/df_to_azure/exceptions.py b/df_to_azure/exceptions.py index 121e88d..254a050 100644 --- a/df_to_azure/exceptions.py +++ b/df_to_azure/exceptions.py @@ -31,6 +31,6 @@ class DoubleColumnNamesError(Exception): class UpsertError(Exception): - """For the moment upsert want to use a staging table that has the same name, but other columns""" + """For the moment upsert gives an error""" pass