Skip to content

Commit

Permalink
[DOP-23742] Fix using conn.setReadOnly(...) for MySQL, MSSQL
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Feb 6, 2025
1 parent 64550d1 commit 73dcefa
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 9 deletions.
4 changes: 4 additions & 0 deletions docs/changelog/next_release/337.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Previously ``MSSQL.fetch(...)`` and ``MySQL.fetch(...)`` opened a read-write connection, despite documentation said that is should be read-only.
Now this is fixed:
* ``MSSQL.fetch(...)`` establishes connection with ``ApplicationIntent=ReadOnly``.
* ``MySQL.fetch(...)`` calls ``SET SESSION TRANSACTION READ ONLY`` statement.
14 changes: 6 additions & 8 deletions onetl/connection/db_connection/jdbc_mixin/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,12 @@ def _options_to_connection_properties(self, options: JDBCFetchOptions | JDBCExec
)
return jdbc_options.asConnectionProperties()

def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions):
def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions, read_only: bool):
connection_properties = self._options_to_connection_properties(options)
driver_manager = self.spark._jvm.java.sql.DriverManager # type: ignore
return driver_manager.getConnection(self.jdbc_url, connection_properties)
connection = driver_manager.getConnection(self.jdbc_url, connection_properties)
connection.setReadOnly(read_only) # type: ignore
return connection

def _get_spark_dialect_name(self) -> str:
"""
Expand All @@ -389,7 +391,6 @@ def _get_spark_dialect(self):

def _get_statement_args(self) -> tuple[int, ...]:
resultset = self.spark._jvm.java.sql.ResultSet # type: ignore

return resultset.TYPE_FORWARD_ONLY, resultset.CONCUR_READ_ONLY

def _execute_on_driver(
Expand All @@ -409,13 +410,10 @@ def _execute_on_driver(
Each time new connection is opened to execute the statement, and then closed.
"""

jdbc_connection = self._get_jdbc_connection(options)
statement_args = self._get_statement_args()
jdbc_connection = self._get_jdbc_connection(options, read_only)
with closing(jdbc_connection):
jdbc_connection.setReadOnly(read_only) # type: ignore

statement_args = self._get_statement_args()
jdbc_statement = self._build_statement(statement, statement_type, jdbc_connection, statement_args)

return self._execute_statement(jdbc_statement, statement, options, callback, read_only)

def _execute_statement(
Expand Down
14 changes: 14 additions & 0 deletions onetl/connection/db_connection/mssql/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
from onetl._util.classproperty import classproperty
from onetl._util.version import Version
from onetl.connection.db_connection.jdbc_connection import JDBCConnection
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCExecuteOptions,
JDBCFetchOptions,
)
from onetl.connection.db_connection.mssql.dialect import MSSQLDialect
from onetl.connection.db_connection.mssql.options import (
MSSQLExecuteOptions,
Expand Down Expand Up @@ -277,3 +281,13 @@ def __str__(self):

port = self.port or 1433
return f"{self.__class__.__name__}[{self.host}:{port}/{self.database}]"

def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions, read_only: bool):
if read_only:
# connection.setReadOnly() is no-op in MSSQL:
# https://learn.microsoft.com/en-us/sql/connect/jdbc/reference/setreadonly-method-sqlserverconnection?view=sql-server-ver16
# Instead, we should change connection type via option:
# https://github.com/microsoft/mssql-jdbc/issues/484
options = options.copy(update={"ApplicationIntent": "ReadOnly"})

return super()._get_jdbc_connection(options, read_only)
18 changes: 18 additions & 0 deletions onetl/connection/db_connection/mysql/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
from __future__ import annotations

import warnings
from contextlib import closing
from typing import ClassVar, Optional

from etl_entities.instance import Host

from onetl._util.classproperty import classproperty
from onetl._util.version import Version
from onetl.connection.db_connection.jdbc_connection import JDBCConnection
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCExecuteOptions,
JDBCFetchOptions,
)
from onetl.connection.db_connection.mysql.dialect import MySQLDialect
from onetl.connection.db_connection.mysql.options import (
MySQLExecuteOptions,
Expand Down Expand Up @@ -178,3 +183,16 @@ def instance_url(self) -> str:

def __str__(self):
return f"{self.__class__.__name__}[{self.host}:{self.port}]"

def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions, read_only: bool):
connection = super()._get_jdbc_connection(options, read_only)

# connection.setReadOnly() is no-op in MySQL JDBC driver. Session type can be changed by statement:
# https://stackoverflow.com/questions/10240890/sql-open-connection-in-read-only-mode#comment123789248_48959180
# https://dev.mysql.com/doc/refman/8.4/en/set-transaction.html
transaction = "READ ONLY" if read_only else "READ WRITE"
statement = connection.prepareStatement(f"SET SESSION TRANSACTION {transaction};")
with closing(statement):
statement.execute()

return connection
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ def test_clickhouse_connection_fetch(spark, processing, load_table_data, suffix,
with pytest.raises(Exception):
clickhouse.fetch(f"SELEC 1{suffix}")

# fetch is always read-only
with pytest.raises(Exception):
clickhouse.fetch(f"DROP TABLE {table}{suffix}")


@pytest.mark.parametrize("suffix", ["", ";"])
def test_clickhouse_connection_execute_ddl(spark, processing, get_schema_table, suffix):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ def test_greenplum_connection_fetch(spark, processing, load_table_data, suffix):
with pytest.raises(Exception):
greenplum.fetch(f"SELEC 1{suffix}")

# fetch is read-only
with pytest.raises(Exception):
greenplum.fetch(f"DROP TABLE {table}{suffix}")


@pytest.mark.parametrize("suffix", ["", ";"])
def test_greenplum_connection_ddl(spark, processing, get_schema_table, suffix):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ def test_mssql_connection_fetch(spark, processing, load_table_data, suffix):
with pytest.raises(Exception):
mssql.fetch(f"SELEC 1{suffix}")

# fetch is always read-only
with pytest.raises(Exception):
mssql.fetch(f"DROP TABLE {table}{suffix}")


@pytest.mark.parametrize("suffix", ["", ";"])
def test_mssql_connection_execute_ddl(spark, processing, get_schema_table, suffix):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ def test_mysql_connection_fetch(spark, processing, load_table_data, suffix):
with pytest.raises(Exception):
mysql.fetch(f"SELEC 1{suffix}")

# fetch is always read-only
with pytest.raises(Exception):
mysql.fetch(f"DROP TABLE {table}{suffix}")


@pytest.mark.parametrize("suffix", ["", ";"])
def test_mysql_connection_execute_ddl(spark, processing, get_schema_table, suffix):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ def test_oracle_connection_fetch(spark, processing, load_table_data, suffix):
filtered_df = table_df[table_df.ID_INT < 50]
processing.assert_equal_df(df=df, other_frame=filtered_df, order_by="id_int")

# fetch is always read-only
with pytest.raises(Exception):
oracle.fetch(f"DROP TABLE {table}{suffix}")

# not supported by JDBC, use SELECT * FROM v$tables
with pytest.raises(Exception):
oracle.fetch(f"SHOW TABLES{suffix}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,13 @@ def test_postgres_connection_fetch(spark, processing, load_table_data, suffix, c
with pytest.raises(Exception):
postgres.fetch(f"SELEC 1{suffix}")

# fetch is read-only
with pytest.raises(Exception):
postgres.fetch(f"DROP TABLE {table}{suffix}")


@pytest.mark.parametrize("suffix", ["", ";"])
def test_postgres_connection_ddl(spark, processing, get_schema_table, suffix):
def test_postgres_connection_execute_ddl(spark, processing, get_schema_table, suffix):
postgres = Postgres(
host=processing.host,
port=processing.port,
Expand Down

0 comments on commit 73dcefa

Please sign in to comment.