Skip to content

Commit

Permalink
#378 Add support for MySQL databases in the JDBC source.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Mar 21, 2024
1 parent fb7e1b2 commit 5b839d3
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ There are many other data pipeline management tools. Why you would want to use P
* Extendable
- If your data source or sink is not supported by Pramen yet? You can implement your own very easy.
* Built-in support of various relational database sources
- Pramen already supports getting data from the following RDMS: PostgreSQL, Oracle Data Warehouse, Microsoft SQL Server,
- Pramen already supports getting data from the following RDMS: PostgreSQL, MySql, Oracle Data Warehouse, Microsoft SQL Server,
Denodo Virtualized and other standard JDBC compliant data sources

# Typical Use Case and Benefits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ object SqlGeneratorLoader {
private def fromDriverName(driver: String, sqlConfig: SqlConfig): SqlGenerator = {
driver match {
case "org.postgresql.Driver" => new SqlGeneratorPostgreSQL(sqlConfig)
case "com.mysql.cj.jdbc.Driver" => new SqlGeneratorMySQL(sqlConfig)
case "oracle.jdbc.OracleDriver" => new SqlGeneratorOracle(sqlConfig)
case "net.sourceforge.jtds.jdbc.Driver" => new SqlGeneratorMicrosoft(sqlConfig)
case "com.microsoft.sqlserver.jdbc.SQLServerDriver" => new SqlGeneratorMicrosoft(sqlConfig)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.sql

import za.co.absa.pramen.api.sql.{SqlColumnType, SqlConfig, SqlGeneratorBase}

import java.time.LocalDate
import java.time.format.DateTimeFormatter

class SqlGeneratorMySQL(sqlConfig: SqlConfig) extends SqlGeneratorBase(sqlConfig) {
private val dateFormatterApp = DateTimeFormatter.ofPattern(sqlConfig.dateFormatApp)

override val beginEndEscapeChars: (Char, Char) = ('`', '`')

override def getDtable(sql: String): String = {
if (sql.exists(_ == ' ')) {
s"($sql) t"
} else {
sql
}
}

override def getCountQuery(tableName: String): String = {
s"SELECT COUNT(*) FROM ${escape(tableName)}"
}

override def getCountQuery(tableName: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate): String = {
val where = getWhere(infoDateBegin, infoDateEnd)
s"SELECT COUNT(*) FROM ${escape(tableName)} WHERE $where"
}

override def getDataQuery(tableName: String, columns: Seq[String], limit: Option[Int]): String = {
s"SELECT ${columnExpr(columns)} FROM ${escape(tableName)}${getLimit(limit)}"
}

override def getDataQuery(tableName: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String], limit: Option[Int]): String = {
val where = getWhere(infoDateBegin, infoDateEnd)
s"SELECT ${columnExpr(columns)} FROM ${escape(tableName)} WHERE $where${getLimit(limit)}"
}

override def getWhere(dateBegin: LocalDate, dateEnd: LocalDate): String = {
val dateBeginLit = getDateLiteral(dateBegin)
val dateEndLit = getDateLiteral(dateEnd)

val dateTypes: Array[SqlColumnType] = Array(SqlColumnType.DATETIME)

val infoDateColumnAdjusted =
if (dateTypes.contains(sqlConfig.infoDateType)) {
s"DATE($infoDateColumn)"
} else {
infoDateColumn
}

if (dateBeginLit == dateEndLit) {
s"$infoDateColumnAdjusted = $dateBeginLit"
} else {
s"$infoDateColumnAdjusted >= $dateBeginLit AND $infoDateColumnAdjusted <= $dateEndLit"
}
}

override def getDateLiteral(date: LocalDate): String = {
sqlConfig.infoDateType match {
case SqlColumnType.DATE =>
val dateStr = DateTimeFormatter.ISO_LOCAL_DATE.format(date)
s"'$dateStr'"
case SqlColumnType.DATETIME =>
val dateStr = DateTimeFormatter.ISO_LOCAL_DATE.format(date)
s"'$dateStr'"
case SqlColumnType.STRING =>
val dateStr = dateFormatterApp.format(date)
s"'$dateStr'"
case SqlColumnType.NUMBER =>
val dateStr = dateFormatterApp.format(date)
s"$dateStr"
}
}

private def getLimit(limit: Option[Int]): String = {
limit.map(n => s" LIMIT $n").getOrElse("")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,122 @@ class SqlGeneratorLoaderSuite extends AnyWordSpec with RelationalDbFixture {
}
}

"MySQL SQL generator" should {
val genDate = getSqlGenerator("com.mysql.cj.jdbc.Driver", sqlConfigDate)
val genDateTime = getSqlGenerator("com.mysql.cj.jdbc.Driver", sqlConfigDateTime)
val genStr = getSqlGenerator("com.mysql.cj.jdbc.Driver", sqlConfigString)
val genNum = getSqlGenerator("com.mysql.cj.jdbc.Driver", sqlConfigNumber)
val genEscaped = getSqlGenerator("com.mysql.cj.jdbc.Driver", sqlConfigEscape)

"generate count queries without date ranges" in {
assert(genDate.getCountQuery("A") == "SELECT COUNT(*) FROM A")
}

"generate data queries without date ranges" in {
assert(genDate.getDataQuery("A", Nil, None) == "SELECT * FROM A")
}

"generate data queries when list of columns is specified" in {
assert(genEscaped.getDataQuery("A", columns, None) == "SELECT `A`, `D`, `Column with spaces` FROM `A`")
}

"generate data queries with limit clause date ranges" in {
assert(genDate.getDataQuery("A", Nil, Some(100)) == "SELECT * FROM A LIMIT 100")
}

"generate ranged count queries" when {
"date is in DATE format" in {
assert(genDate.getCountQuery("A", date1, date1) ==
"SELECT COUNT(*) FROM A WHERE D = '2020-08-17'")
assert(genDate.getCountQuery("A", date1, date2) ==
"SELECT COUNT(*) FROM A WHERE D >= '2020-08-17' AND D <= '2020-08-30'")
}

"date is in DATETIME format" in {
assert(genDateTime.getCountQuery("A", date1, date1) ==
"SELECT COUNT(*) FROM A WHERE DATE(D) = '2020-08-17'")
assert(genDateTime.getCountQuery("A", date1, date2) ==
"SELECT COUNT(*) FROM A WHERE DATE(D) >= '2020-08-17' AND DATE(D) <= '2020-08-30'")
}

"date is in STRING format" in {
assert(genStr.getCountQuery("A", date1, date1) ==
"SELECT COUNT(*) FROM A WHERE D = '2020-08-17'")
assert(genStr.getCountQuery("A", date1, date2) ==
"SELECT COUNT(*) FROM A WHERE D >= '2020-08-17' AND D <= '2020-08-30'")
}

"date is in NUMBER format" in {
assert(genNum.getCountQuery("A", date1, date1) ==
"SELECT COUNT(*) FROM A WHERE D = 20200817")
assert(genNum.getCountQuery("A", date1, date2) ==
"SELECT COUNT(*) FROM A WHERE D >= 20200817 AND D <= 20200830")
}

"the table name and column name need to be escaped" in {
assert(genEscaped.getCountQuery("Input Table", date1, date1) ==
"SELECT COUNT(*) FROM `Input Table` WHERE `Info date` = '2020-08-17'")
assert(genEscaped.getCountQuery("Input Table", date1, date2) ==
"SELECT COUNT(*) FROM `Input Table` WHERE `Info date` >= '2020-08-17' AND `Info date` <= '2020-08-30'")
}
}

"generate ranged data queries" when {
"date is in DATE format" in {
assert(genDate.getDataQuery("A", date1, date1, Nil, None) ==
"SELECT * FROM A WHERE D = '2020-08-17'")
assert(genDate.getDataQuery("A", date1, date2, Nil, None) ==
"SELECT * FROM A WHERE D >= '2020-08-17' AND D <= '2020-08-30'")
}

"date is in DATETIME format" in {
assert(genDateTime.getDataQuery("A", date1, date1, Nil, None) ==
"SELECT * FROM A WHERE DATE(D) = '2020-08-17'")
assert(genDateTime.getDataQuery("A", date1, date2, Nil, None) ==
"SELECT * FROM A WHERE DATE(D) >= '2020-08-17' AND DATE(D) <= '2020-08-30'")
}

"date is in STRING format" in {
assert(genStr.getDataQuery("A", date1, date1, Nil, None) ==
"SELECT * FROM A WHERE D = '2020-08-17'")
assert(genStr.getDataQuery("A", date1, date2, Nil, None) ==
"SELECT * FROM A WHERE D >= '2020-08-17' AND D <= '2020-08-30'")
}

"date is in NUMBER format" in {
assert(genNum.getDataQuery("A", date1, date1, Nil, None) ==
"SELECT * FROM A WHERE D = 20200817")
assert(genNum.getDataQuery("A", date1, date2, Nil, None) ==
"SELECT * FROM A WHERE D >= 20200817 AND D <= 20200830")
}

"with limit records" in {
assert(genDate.getDataQuery("A", date1, date1, Nil, Some(100)) ==
"SELECT * FROM A WHERE D = '2020-08-17' LIMIT 100")
assert(genDate.getDataQuery("A", date1, date2, Nil, Some(100)) ==
"SELECT * FROM A WHERE D >= '2020-08-17' AND D <= '2020-08-30' LIMIT 100")
}
}

"getDtable" should {
"return the original table when a table is provided" in {
assert(genDate.getDtable("A") == "A")
}

"wrapped query without alias for SQL queries " in {
assert(genDate.getDtable("SELECT A FROM B") == "(SELECT A FROM B) t")
}
}

"quote" should {
"quote each subfields separately" in {
val actual = genDate.quote("System User.`Table Name`")

assert(actual == "`System User`.`Table Name`")
}
}
}

"DB2 SQL generator" should {
val genDate = getSqlGenerator("com.ibm.db2.jcc.DB2Driver", sqlConfigDate)
val genDateTime = getSqlGenerator("com.ibm.db2.jcc.DB2Driver", sqlConfigDateTime)
Expand Down

0 comments on commit 5b839d3

Please sign in to comment.