Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48385][SQL] Migrate the jdbc driver of mariadb from 2.x to 3.x #46655

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import org.apache.spark.sql.execution.datasources.jdbc.connection.SecureConnecti
import org.apache.spark.tags.DockerTest

/**
* To run this test suite for a specific version (e.g., mariadb:10.5.12):
* To run this test suite for a specific version (e.g., mariadb:10.5.24):
* {{{
* ENABLE_DOCKER_INTEGRATION_TESTS=1 MARIADB_DOCKER_IMAGE_NAME=mariadb:10.5.12
* ENABLE_DOCKER_INTEGRATION_TESTS=1 MARIADB_DOCKER_IMAGE_NAME=mariadb:10.5.24
* ./build/sbt -Pdocker-integration-tests
* "docker-integration-tests/testOnly org.apache.spark.sql.jdbc.MariaDBKrbIntegrationSuite"
* }}}
Expand All @@ -38,15 +38,15 @@ class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
override protected val keytabFileName = "mariadb.keytab"

override val db = new DatabaseOnDocker {
override val imageName = sys.env.getOrElse("MARIADB_DOCKER_IMAGE_NAME", "mariadb:10.5.12")
override val imageName = sys.env.getOrElse("MARIADB_DOCKER_IMAGE_NAME", "mariadb:10.5.24")
override val env = Map(
"MYSQL_ROOT_PASSWORD" -> "rootpass"
)
override val usesIpc = false
override val jdbcPort = 3306

override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:mysql://$ip:$port/mysql?user=$principal"
s"jdbc:mysql://$ip:$port/mysql?permitMysqlScheme&user=$principal"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


override def getEntryPoint: Option[String] =
Some("/docker-entrypoint/mariadb_docker_entrypoint.sh")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,30 +154,43 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
test("SPARK-47462: Unsigned numeric types") {
val df = sqlContext.read.jdbc(jdbcUrl, "unsigned_numbers", new Properties)
val rows = df.head()
assert(rows.get(0).isInstanceOf[Short])
assert(rows.get(1).isInstanceOf[Integer])
assert(rows.get(2).isInstanceOf[Integer])
assert(rows.get(3).isInstanceOf[Long])
assert(rows.get(4).isInstanceOf[BigDecimal])
assert(rows.get(5).isInstanceOf[BigDecimal])
assert(rows.get(6).isInstanceOf[Double])
// Unlike MySQL, MariaDB seems not to distinguish signed and unsigned tinyint(1).
val isMaria = jdbcUrl.indexOf("disableMariaDbDriver") == -1
if (isMaria) {
assert(rows.get(0).isInstanceOf[Integer])
assert(rows.get(1).isInstanceOf[Long])
assert(rows.get(2).isInstanceOf[Integer])
assert(rows.get(3).isInstanceOf[BigDecimal])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you describe the root cause of this change, @panbingkun ? For me, this looks suspicious like a breaking change.

- assert(rows.get(3).isInstanceOf[Long])
+ assert(rows.get(3).isInstanceOf[BigDecimal])

Copy link
Contributor Author

@panbingkun panbingkun May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the root cause of this change

  • change:
    index[0], Short => Integer
    index[1], Integer => Long
    index[3], Long => Decimal

  • Before - (mariadb jdbc driver: 2.7.12):
    image

  • After - (mariadb jdbc driver: 3.4.0):
    image

  • Summary
    the data types obtained from the underlying meta are different

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also add compatible logic in MySQLDialect##getCatalystType. Do we need to do this in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make changes as follows to keep this compatible:
image

If we need to do this, how about making related changes directly in this PR? Or a new seperated PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After - (mariadb jdbc driver: 3.4.0):

These mariadb SQL Data Type <-> JDBC Data Type mapping rules seem incorrect, do they have a statement why they made such change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try to find it.

assert(rows.get(4).isInstanceOf[BigDecimal])
assert(rows.get(5).isInstanceOf[BigDecimal])
assert(rows.get(6).isInstanceOf[Double])
// Unlike MySQL, MariaDB seems not to distinguish signed and unsigned tinyint(1).
assert(rows.get(7).isInstanceOf[Boolean])
} else {
assert(rows.get(7).isInstanceOf[Short])
}
assert(rows.getShort(0) === 255)
assert(rows.getInt(1) === 65535)
assert(rows.getInt(2) === 16777215)
assert(rows.getLong(3) === 4294967295L)
assert(rows.getAs[BigDecimal](4).equals(new BigDecimal("9223372036854775808")))
assert(rows.getAs[BigDecimal](5).equals(new BigDecimal("123456789012345.123456789012345000")))
assert(rows.getDouble(6) === 1.0000000000000002)
if (isMaria) {

assert(rows.getInt(0) === 255)
assert(rows.getLong(1) === 65535L)
assert(rows.getInt(2) === 16777215)
assert(rows.getAs[BigDecimal](3).equals(new BigDecimal("4294967295")))
assert(rows.getAs[BigDecimal](4).equals(new BigDecimal("9223372036854775808")))
assert(rows.getAs[BigDecimal](5).equals(new BigDecimal("123456789012345.123456789012345000")))
assert(rows.getDouble(6) === 1.0000000000000002)
assert(rows.getBoolean(7) === false)
} else {
assert(rows.get(0).isInstanceOf[Short])
assert(rows.get(1).isInstanceOf[Integer])
assert(rows.get(2).isInstanceOf[Integer])
assert(rows.get(3).isInstanceOf[Long])
assert(rows.get(4).isInstanceOf[BigDecimal])
assert(rows.get(5).isInstanceOf[BigDecimal])
assert(rows.get(6).isInstanceOf[Double])
assert(rows.get(7).isInstanceOf[Short])

assert(rows.getShort(0) === 255)
assert(rows.getInt(1) === 65535)
assert(rows.getInt(2) === 16777215)
assert(rows.getLong(3) === 4294967295L)
assert(rows.getAs[BigDecimal](4).equals(new BigDecimal("9223372036854775808")))
assert(rows.getAs[BigDecimal](5).equals(new BigDecimal("123456789012345.123456789012345000")))
assert(rows.getDouble(6) === 1.0000000000000002)
assert(rows.getShort(7) === 0)
}
}
Expand Down Expand Up @@ -367,13 +380,13 @@ class MySQLOverMariaConnectorIntegrationSuite extends MySQLIntegrationSuite {

override val db = new MySQLDatabaseOnDocker {
override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true" +
s"&useSSL=false"
s"jdbc:mysql://$ip:$port/mysql?permitMysqlScheme&user=root&password=rootpass" +
s"&allowPublicKeyRetrieval=true&useSSL=false"
}

override def testConnection(): Unit = {
Using.resource(getConnection()) { conn =>
assert(conn.getClass.getName === "org.mariadb.jdbc.MariaDbConnection")
assert(conn.getClass.getName === "org.mariadb.jdbc.Connection")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class MySQLOverMariaConnectorIntegrationSuite extends MySQLIntegrationSuite {

override val db = new MySQLDatabaseOnDocker {
override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true" +
s"&useSSL=false"
s"jdbc:mysql://$ip:$port/mysql?permitMysqlScheme&user=root&password=rootpass" +
s"&allowPublicKeyRetrieval=true&useSSL=false"
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@
-Djdk.reflect.useDirectMethodHandle=false
-Dio.netty.tryReflectionSetAccessible=true
</extraJavaTestArgs>
<mariadb.java.client.version>2.7.12</mariadb.java.client.version>
<mariadb.java.client.version>3.4.0</mariadb.java.client.version>
<mysql.connector.version>8.4.0</mysql.connector.version>
<postgresql.version>42.7.3</postgresql.version>
<db2.jcc.version>11.5.9.0</db2.jcc.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class MariaDBConnectionProviderSuite extends ConnectionProviderSuiteBase {
val provider = new MariaDBConnectionProvider()
val driver = registerDriver(provider.driverClass)

testSecureConnectionProvider(provider, driver, options("jdbc:mysql://localhost/mysql"))
testSecureConnectionProvider(provider, driver,
options("jdbc:mysql://localhost/mysql?permitMysqlScheme"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1400,7 +1400,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession {

test("SPARK-18419: Fix `asConnectionProperties` to filter case-insensitively") {
val parameters = Map(
"url" -> "jdbc:mysql://localhost:3306/temp",
"url" -> "jdbc:mysql://localhost:3306/temp?permitMysqlScheme",
"dbtable" -> "t1",
"numPartitions" -> "10")
assert(new JDBCOptions(parameters).asConnectionProperties.isEmpty)
Expand Down