diff --git a/docs/changelog/next_release/144.feature.rst b/docs/changelog/next_release/144.feature.rst new file mode 100644 index 000000000..a0cf257e4 --- /dev/null +++ b/docs/changelog/next_release/144.feature.rst @@ -0,0 +1 @@ +Add ``if_exists="ignore"`` and ``error`` to ``JDBC.WriteOptions`` diff --git a/onetl/connection/db_connection/jdbc_connection/connection.py b/onetl/connection/db_connection/jdbc_connection/connection.py index 3eb83f538..f5b611910 100644 --- a/onetl/connection/db_connection/jdbc_connection/connection.py +++ b/onetl/connection/db_connection/jdbc_connection/connection.py @@ -218,7 +218,11 @@ def write_df_to_target( write_options = self.WriteOptions.parse(options) jdbc_params = self.options_to_jdbc_params(write_options) - mode = "append" if write_options.if_exists == JDBCTableExistBehavior.APPEND else "overwrite" + mode = ( + "overwrite" + if write_options.if_exists == JDBCTableExistBehavior.REPLACE_ENTIRE_TABLE + else write_options.if_exists.value + ) log.info("|%s| Saving data to a table %r", self.__class__.__name__, target) df.write.jdbc(table=target, mode=mode, **jdbc_params) log.info("|%s| Table %r successfully written", self.__class__.__name__, target) diff --git a/onetl/connection/db_connection/jdbc_connection/options.py b/onetl/connection/db_connection/jdbc_connection/options.py index c998055fe..dacaded77 100644 --- a/onetl/connection/db_connection/jdbc_connection/options.py +++ b/onetl/connection/db_connection/jdbc_connection/options.py @@ -84,6 +84,8 @@ class JDBCTableExistBehavior(str, Enum): APPEND = "append" + IGNORE = "ignore" + ERROR = "error" REPLACE_ENTIRE_TABLE = "replace_entire_table" def __str__(self) -> str: @@ -413,44 +415,65 @@ class Config: .. dropdown:: Behavior in details - * Table does not exist - Table is created using options provided by user - (``createTableOptions``, ``createTableColumnTypes``, etc). + * Table does not exist + Table is created using options provided by user + (``createTableOptions``, ``createTableColumnTypes``, etc). - * Table exists - Data is appended to a table. Table has the same DDL as before writing data + * Table exists + Data is appended to a table. Table has the same DDL as before writing data - .. warning:: + .. warning:: - This mode does not check whether table already contains - rows from dataframe, so duplicated rows can be created. + This mode does not check whether table already contains + rows from dataframe, so duplicated rows can be created. - Also Spark does not support passing custom options to - insert statement, like ``ON CONFLICT``, so don't try to - implement deduplication using unique indexes or constraints. + Also Spark does not support passing custom options to + insert statement, like ``ON CONFLICT``, so don't try to + implement deduplication using unique indexes or constraints. - Instead, write to staging table and perform deduplication - using :obj:`~execute` method. + Instead, write to staging table and perform deduplication + using :obj:`~execute` method. * ``replace_entire_table`` **Table is dropped and then created, or truncated**. .. dropdown:: Behavior in details - * Table does not exist - Table is created using options provided by user - (``createTableOptions``, ``createTableColumnTypes``, etc). + * Table does not exist + Table is created using options provided by user + (``createTableOptions``, ``createTableColumnTypes``, etc). - * Table exists - Table content is replaced with dataframe content. + * Table exists + Table content is replaced with dataframe content. - After writing completed, target table could either have the same DDL as - before writing data (``truncate=True``), or can be recreated (``truncate=False`` - or source does not support truncation). + After writing completed, target table could either have the same DDL as + before writing data (``truncate=True``), or can be recreated (``truncate=False`` + or source does not support truncation). - .. note:: + * ``ignore`` + Ignores the write operation if the table already exists. + + .. dropdown:: Behavior in details + + * Table does not exist + Table is created using options provided by user + (``createTableOptions``, ``createTableColumnTypes``, etc). + + * Table exists + The write operation is ignored, and no data is written to the table. + + * ``error`` + Raises an error if the table already exists. + + .. dropdown:: Behavior in details + + * Table does not exist + Table is created using options provided by user + (``createTableOptions``, ``createTableColumnTypes``, etc). + + * Table exists + An error is raised, and no data is written to the table. - ``error`` and ``ignore`` modes are not supported. """ batchsize: int = 20_000 diff --git a/tests/tests_integration/tests_core_integration/tests_db_writer_integration/test_postgres_writer_integration.py b/tests/tests_integration/tests_core_integration/tests_db_writer_integration/test_postgres_writer_integration.py index 195b16e02..cda43c8a8 100644 --- a/tests/tests_integration/tests_core_integration/tests_db_writer_integration/test_postgres_writer_integration.py +++ b/tests/tests_integration/tests_core_integration/tests_db_writer_integration/test_postgres_writer_integration.py @@ -6,7 +6,17 @@ pytestmark = pytest.mark.postgres -def test_postgres_writer_snapshot(spark, processing, prepare_schema_table): +@pytest.mark.parametrize( + "options", + [ + {}, + {"if_exists": "append"}, + {"if_exists": "replace_entire_table"}, + {"if_exists": "error"}, + {"if_exists": "ignore"}, + ], +) +def test_postgres_writer_snapshot(spark, processing, get_schema_table, options): df = processing.create_spark_df(spark=spark) postgres = Postgres( @@ -20,14 +30,15 @@ def test_postgres_writer_snapshot(spark, processing, prepare_schema_table): writer = DBWriter( connection=postgres, - target=prepare_schema_table.full_name, + target=get_schema_table.full_name, + options=Postgres.WriteOptions(**options), ) writer.run(df) processing.assert_equal_df( - schema=prepare_schema_table.schema, - table=prepare_schema_table.table, + schema=get_schema_table.schema, + table=get_schema_table.table, df=df, ) @@ -86,7 +97,7 @@ def test_postgres_writer_snapshot_with_pydantic_options(spark, processing, prepa ) -def test_postgres_writer_mode_append(spark, processing, prepare_schema_table): +def test_postgres_writer_if_exists_append(spark, processing, prepare_schema_table): df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500) df1 = df[df.id_int < 1001] df2 = df[df.id_int > 1000] @@ -116,7 +127,70 @@ def test_postgres_writer_mode_append(spark, processing, prepare_schema_table): ) -def test_postgres_writer_mode_replace_entire_table(spark, processing, prepare_schema_table): +def test_postgres_writer_if_exists_error(spark, processing, prepare_schema_table): + from pyspark.sql.utils import AnalysisException + + df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500) + + postgres = Postgres( + host=processing.host, + port=processing.port, + user=processing.user, + password=processing.password, + database=processing.database, + spark=spark, + ) + + writer = DBWriter( + connection=postgres, + target=prepare_schema_table.full_name, + options=Postgres.WriteOptions(if_exists="error"), + ) + + with pytest.raises( + AnalysisException, + match=f"Table or view '{prepare_schema_table.full_name}' already exists. SaveMode: ErrorIfExists.", + ): + writer.run(df) + + empty_df = spark.createDataFrame([], df.schema) + + processing.assert_equal_df( + schema=prepare_schema_table.schema, + table=prepare_schema_table.table, + df=empty_df, + ) + + +def test_postgres_writer_if_exists_ignore(spark, processing, prepare_schema_table): + df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500) + + postgres = Postgres( + host=processing.host, + port=processing.port, + user=processing.user, + password=processing.password, + database=processing.database, + spark=spark, + ) + + writer = DBWriter( + connection=postgres, + target=prepare_schema_table.full_name, + options=Postgres.WriteOptions(if_exists="ignore"), + ) + + writer.run(df) # The write operation is ignored + empty_df = spark.createDataFrame([], df.schema) + + processing.assert_equal_df( + schema=prepare_schema_table.schema, + table=prepare_schema_table.table, + df=empty_df, + ) + + +def test_postgres_writer_if_exists_replace_entire_table(spark, processing, prepare_schema_table): df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500) df1 = df[df.id_int < 1001] df2 = df[df.id_int > 1000] diff --git a/tests/tests_unit/tests_db_connection_unit/test_jdbc_options_unit.py b/tests/tests_unit/tests_db_connection_unit/test_jdbc_options_unit.py index ae81402cc..f932408d0 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_jdbc_options_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_jdbc_options_unit.py @@ -266,6 +266,8 @@ def test_jdbc_write_options_to_jdbc(spark_mock): [ ({}, JDBCTableExistBehavior.APPEND), ({"if_exists": "append"}, JDBCTableExistBehavior.APPEND), + ({"if_exists": "ignore"}, JDBCTableExistBehavior.IGNORE), + ({"if_exists": "error"}, JDBCTableExistBehavior.ERROR), ({"if_exists": "replace_entire_table"}, JDBCTableExistBehavior.REPLACE_ENTIRE_TABLE), ], ) @@ -294,6 +296,18 @@ def test_jdbc_write_options_if_exists(options, value): "Mode `overwrite` is deprecated since v0.9.0 and will be removed in v1.0.0. " "Use `replace_entire_table` instead", ), + ( + {"mode": "ignore"}, + JDBCTableExistBehavior.IGNORE, + "Option `WriteOptions(mode=...)` is deprecated since v0.9.0 and will be removed in v1.0.0. " + "Use `WriteOptions(if_exists=...)` instead", + ), + ( + {"mode": "error"}, + JDBCTableExistBehavior.ERROR, + "Option `WriteOptions(mode=...)` is deprecated since v0.9.0 and will be removed in v1.0.0. " + "Use `WriteOptions(if_exists=...)` instead", + ), ], ) def test_jdbc_write_options_mode_deprecated(options, value, message): @@ -305,10 +319,6 @@ def test_jdbc_write_options_mode_deprecated(options, value, message): @pytest.mark.parametrize( "options", [ - # disallowed modes - {"mode": "error"}, - {"mode": "ignore"}, - # wrong mode {"mode": "wrong_mode"}, ], )