Skip to content

Commit

Permalink
Merge pull request #111 from zypp-io/fix-bug-staging-at-upsert
Browse files Browse the repository at this point in the history
Fix bug for staging schema with upsert
  • Loading branch information
erfannariman authored May 20, 2022
2 parents 01aa074 + 82b0079 commit aeba034
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repos:
hooks:
- id: flake8
- repo: https://github.com/psf/black
rev: 21.6b0
rev: 22.3.0
hooks:
- id: black
args: [--line-length=120]
Expand Down
Binary file added data/salesfoce_opportunity_merged.pickle
Binary file not shown.
14 changes: 12 additions & 2 deletions df_to_azure/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from urllib.parse import quote_plus

from sqlalchemy import create_engine
from sqlalchemy.exc import ProgrammingError

from df_to_azure.exceptions import UpsertError


class SqlUpsert:
Expand Down Expand Up @@ -57,8 +60,15 @@ def create_stored_procedure(self):
query_drop_procedure = self.drop_procedure()
con.execute(query_drop_procedure)
query_create_merge = self.create_merge_query()
con.execute(query_create_merge)
t.commit()
try:
con.execute(query_create_merge)
t.commit()
except ProgrammingError:
raise UpsertError(
"During upsert there has been an issue. One of the sources could be that the table in"
" staging has columns that do not match the table you want to upsert. Remove the "
f"staging table {self.table_name} manually in that case"
)


def auth_azure():
Expand Down
6 changes: 6 additions & 0 deletions df_to_azure/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,9 @@ class DoubleColumnNamesError(Exception):
"""For writing to Azure we do not accept double column names"""

pass


class UpsertError(Exception):
"""For the moment upsert gives an error"""

pass
18 changes: 18 additions & 0 deletions df_to_azure/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def df_to_azure(
create=False,
dtypes=None,
parquet=False,
clean_staging=True,
):

if parquet:
Expand All @@ -49,6 +50,7 @@ def df_to_azure(
decimal_precision=decimal_precision,
create=create,
dtypes=dtypes,
clean_staging=clean_staging,
).run()

return adf_client, run_response
Expand All @@ -68,6 +70,7 @@ def __init__(
decimal_precision: int = 2,
create: bool = False,
dtypes: dict = None,
clean_staging: bool = True,
):
super().__init__(
df=df,
Expand All @@ -82,6 +85,7 @@ def __init__(
self.text_length = text_length
self.decimal_precision = decimal_precision
self.dtypes = dtypes
self.clean_staging = clean_staging

def run(self):
if self.create:
Expand All @@ -103,6 +107,10 @@ def run(self):
run_response = self.create_pipeline(pipeline_name=self.pipeline_name)
if self.wait_till_finished:
wait_until_pipeline_is_done(self.adf_client, run_response)
if self.clean_staging & (self.method == "upsert"):
# If you used clean_staging=False before and the upsert gives errors on unknown columns -> remove table in
# staging manually
self.clean_staging_after_upsert()

return self.adf_client, run_response

Expand Down Expand Up @@ -254,6 +262,16 @@ def check_for_bigint(self):

return update_dict_bigint

def clean_staging_after_upsert(self):
"""
Function to drop the table created in staging for the upsert. This function prevents issues with unmatchable
columns when doing upsert of different data with the same name.
"""
query = f"""
DROP TABLE IF EXISTS staging.{self.table_name}
"""
execute_stmt(query)


class DfToParquet:
"""
Expand Down
70 changes: 70 additions & 0 deletions df_to_azure/tests/test_upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from df_to_azure import df_to_azure
from df_to_azure.db import auth_azure
from df_to_azure.exceptions import UpsertError

from ..tests import data

Expand Down Expand Up @@ -166,3 +167,72 @@ def test_upsert_spaces_column_name(file_dir="data"):
result = read_sql_table(table_name="sample_spaces_column_name", con=con, schema="test")

assert_frame_equal(expected, result)


@pytest.mark.parametrize("clean_staging", [True, False])
def test_upsert_same_tablename(clean_staging):
"""
Test upsert with the same table name but different columns. Work with clean_staging=True and fails when staging
is not cleaned.
"""
df1 = data["sample_1"]
df_to_azure(
df=df1,
tablename="sample",
schema="test",
method="create",
wait_till_finished=True,
)

df2 = data["sample_2"]
df_to_azure(
df=df2,
tablename="sample",
schema="test",
method="upsert",
id_field="col_a",
wait_till_finished=True,
clean_staging=clean_staging,
)

new_col_names = {"col_a": "test_a", "col_b": "test_b", "col_c": "test_c"}
df1 = df1.rename(columns=new_col_names)
df2 = df2.rename(columns=new_col_names)
df_to_azure(
df=df1,
tablename="sample",
schema="test",
method="create",
wait_till_finished=True,
)
if clean_staging:
df_to_azure(
df=df2,
tablename="sample",
schema="test",
method="upsert",
id_field="test_a",
wait_till_finished=True,
)
expected = DataFrame(
{
"test_a": [1, 3, 4, 5, 6],
"test_b": ["updated value", "test", "test", "new value", "also new"],
"test_c": ["E", "Z", "A", "F", "H"],
}
)

with auth_azure() as con:
result = read_sql_table(table_name="sample", con=con, schema="test")

assert_frame_equal(expected, result)
else:
with pytest.raises(UpsertError):
df_to_azure(
df=df2,
tablename="sample",
schema="test",
method="upsert",
id_field="test_a",
wait_till_finished=True,
)

0 comments on commit aeba034

Please sign in to comment.