Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-8665] - Allow modes "ignore" and "error" in JDBC.WriteOptions #144

Merged
merged 2 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changelog/next_release/144.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``if_exists="ignore"`` and ``error`` to ``JDBC.WriteOptions``
6 changes: 5 additions & 1 deletion onetl/connection/db_connection/jdbc_connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,11 @@ def write_df_to_target(
write_options = self.WriteOptions.parse(options)
jdbc_params = self.options_to_jdbc_params(write_options)

mode = "append" if write_options.if_exists == JDBCTableExistBehavior.APPEND else "overwrite"
mode = (
"overwrite"
if write_options.if_exists == JDBCTableExistBehavior.REPLACE_ENTIRE_TABLE
else write_options.if_exists.value
)
log.info("|%s| Saving data to a table %r", self.__class__.__name__, target)
df.write.jdbc(table=target, mode=mode, **jdbc_params)
log.info("|%s| Table %r successfully written", self.__class__.__name__, target)
Expand Down
69 changes: 46 additions & 23 deletions onetl/connection/db_connection/jdbc_connection/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@

class JDBCTableExistBehavior(str, Enum):
APPEND = "append"
IGNORE = "ignore"
ERROR = "error"
REPLACE_ENTIRE_TABLE = "replace_entire_table"

def __str__(self) -> str:
Expand Down Expand Up @@ -413,44 +415,65 @@ class Config:

.. dropdown:: Behavior in details

* Table does not exist
Table is created using options provided by user
(``createTableOptions``, ``createTableColumnTypes``, etc).
* Table does not exist
Table is created using options provided by user
(``createTableOptions``, ``createTableColumnTypes``, etc).

* Table exists
Data is appended to a table. Table has the same DDL as before writing data
* Table exists
Data is appended to a table. Table has the same DDL as before writing data

.. warning::
.. warning::

This mode does not check whether table already contains
rows from dataframe, so duplicated rows can be created.
This mode does not check whether table already contains
rows from dataframe, so duplicated rows can be created.

Also Spark does not support passing custom options to
insert statement, like ``ON CONFLICT``, so don't try to
implement deduplication using unique indexes or constraints.
Also Spark does not support passing custom options to
insert statement, like ``ON CONFLICT``, so don't try to
implement deduplication using unique indexes or constraints.

Instead, write to staging table and perform deduplication
using :obj:`~execute` method.
Instead, write to staging table and perform deduplication
using :obj:`~execute` method.

* ``replace_entire_table``
**Table is dropped and then created, or truncated**.

.. dropdown:: Behavior in details

* Table does not exist
Table is created using options provided by user
(``createTableOptions``, ``createTableColumnTypes``, etc).
* Table does not exist
Table is created using options provided by user
(``createTableOptions``, ``createTableColumnTypes``, etc).

* Table exists
Table content is replaced with dataframe content.
* Table exists
Table content is replaced with dataframe content.

After writing completed, target table could either have the same DDL as
before writing data (``truncate=True``), or can be recreated (``truncate=False``
or source does not support truncation).
After writing completed, target table could either have the same DDL as
before writing data (``truncate=True``), or can be recreated (``truncate=False``
or source does not support truncation).

.. note::
* ``ignore``
Ignores the write operation if the table already exists.

.. dropdown:: Behavior in details

* Table does not exist
Table is created using options provided by user
(``createTableOptions``, ``createTableColumnTypes``, etc).

* Table exists
The write operation is ignored, and no data is written to the table.

* ``error``
Raises an error if the table already exists.

.. dropdown:: Behavior in details

* Table does not exist
Table is created using options provided by user
(``createTableOptions``, ``createTableColumnTypes``, etc).

* Table exists
An error is raised, and no data is written to the table.

``error`` and ``ignore`` modes are not supported.
"""

batchsize: int = 20_000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@
pytestmark = pytest.mark.postgres


def test_postgres_writer_snapshot(spark, processing, prepare_schema_table):
@pytest.mark.parametrize(
"options",
[
{},
{"if_exists": "append"},
{"if_exists": "replace_entire_table"},
{"if_exists": "error"},
{"if_exists": "ignore"},
],
)
def test_postgres_writer_snapshot(spark, processing, get_schema_table, options):
df = processing.create_spark_df(spark=spark)

postgres = Postgres(
Expand All @@ -20,14 +30,15 @@ def test_postgres_writer_snapshot(spark, processing, prepare_schema_table):

writer = DBWriter(
connection=postgres,
target=prepare_schema_table.full_name,
target=get_schema_table.full_name,
options=Postgres.WriteOptions(**options),
)

writer.run(df)

processing.assert_equal_df(
schema=prepare_schema_table.schema,
table=prepare_schema_table.table,
schema=get_schema_table.schema,
table=get_schema_table.table,
df=df,
)

Expand Down Expand Up @@ -86,7 +97,7 @@ def test_postgres_writer_snapshot_with_pydantic_options(spark, processing, prepa
)


def test_postgres_writer_mode_append(spark, processing, prepare_schema_table):
def test_postgres_writer_if_exists_append(spark, processing, prepare_schema_table):
df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500)
df1 = df[df.id_int < 1001]
df2 = df[df.id_int > 1000]
Expand Down Expand Up @@ -116,7 +127,70 @@ def test_postgres_writer_mode_append(spark, processing, prepare_schema_table):
)


def test_postgres_writer_mode_replace_entire_table(spark, processing, prepare_schema_table):
def test_postgres_writer_if_exists_error(spark, processing, prepare_schema_table):
from pyspark.sql.utils import AnalysisException

df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500)

postgres = Postgres(
host=processing.host,
port=processing.port,
user=processing.user,
password=processing.password,
database=processing.database,
spark=spark,
)

writer = DBWriter(
connection=postgres,
target=prepare_schema_table.full_name,
options=Postgres.WriteOptions(if_exists="error"),
)

with pytest.raises(
AnalysisException,
match=f"Table or view '{prepare_schema_table.full_name}' already exists. SaveMode: ErrorIfExists.",
):
writer.run(df)

empty_df = spark.createDataFrame([], df.schema)

processing.assert_equal_df(
schema=prepare_schema_table.schema,
table=prepare_schema_table.table,
df=empty_df,
)


def test_postgres_writer_if_exists_ignore(spark, processing, prepare_schema_table):
df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500)

postgres = Postgres(
host=processing.host,
port=processing.port,
user=processing.user,
password=processing.password,
database=processing.database,
spark=spark,
)

writer = DBWriter(
connection=postgres,
target=prepare_schema_table.full_name,
options=Postgres.WriteOptions(if_exists="ignore"),
)

writer.run(df) # The write operation is ignored
empty_df = spark.createDataFrame([], df.schema)

processing.assert_equal_df(
schema=prepare_schema_table.schema,
table=prepare_schema_table.table,
df=empty_df,
)


def test_postgres_writer_if_exists_replace_entire_table(spark, processing, prepare_schema_table):
df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500)
df1 = df[df.id_int < 1001]
df2 = df[df.id_int > 1000]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ def test_jdbc_write_options_to_jdbc(spark_mock):
[
({}, JDBCTableExistBehavior.APPEND),
({"if_exists": "append"}, JDBCTableExistBehavior.APPEND),
({"if_exists": "ignore"}, JDBCTableExistBehavior.IGNORE),
({"if_exists": "error"}, JDBCTableExistBehavior.ERROR),
({"if_exists": "replace_entire_table"}, JDBCTableExistBehavior.REPLACE_ENTIRE_TABLE),
],
)
Expand Down Expand Up @@ -294,6 +296,18 @@ def test_jdbc_write_options_if_exists(options, value):
"Mode `overwrite` is deprecated since v0.9.0 and will be removed in v1.0.0. "
"Use `replace_entire_table` instead",
),
(
{"mode": "ignore"},
JDBCTableExistBehavior.IGNORE,
"Option `WriteOptions(mode=...)` is deprecated since v0.9.0 and will be removed in v1.0.0. "
"Use `WriteOptions(if_exists=...)` instead",
),
(
{"mode": "error"},
JDBCTableExistBehavior.ERROR,
"Option `WriteOptions(mode=...)` is deprecated since v0.9.0 and will be removed in v1.0.0. "
"Use `WriteOptions(if_exists=...)` instead",
),
],
)
def test_jdbc_write_options_mode_deprecated(options, value, message):
Expand All @@ -305,10 +319,6 @@ def test_jdbc_write_options_mode_deprecated(options, value, message):
@pytest.mark.parametrize(
"options",
[
# disallowed modes
{"mode": "error"},
{"mode": "ignore"},
# wrong mode
{"mode": "wrong_mode"},
],
)
Expand Down
Loading