-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathQueryExecutorJdbc.scala
105 lines (89 loc) · 3.27 KB
/
QueryExecutorJdbc.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package za.co.absa.pramen.core.utils.hive
import org.slf4j.LoggerFactory
import za.co.absa.pramen.core.reader.JdbcUrlSelector
import za.co.absa.pramen.core.reader.model.JdbcConfig
import java.sql.{Connection, ResultSet, SQLException, SQLSyntaxErrorException}
import scala.util.{Failure, Try}
import scala.util.control.NonFatal
class QueryExecutorJdbc(jdbcUrlSelector: JdbcUrlSelector) extends QueryExecutor {
private val log = LoggerFactory.getLogger(this.getClass)
private var connection: Connection = _
private val defaultRetries = jdbcUrlSelector.getNumberOfUrls
private val retries =
jdbcUrlSelector.jdbcConfig.retries.getOrElse(defaultRetries)
override def doesTableExist(dbName: Option[String], tableName: String): Boolean = {
val fullTableName = HiveHelper.getFullTable(dbName, tableName)
val query = if (jdbcUrlSelector.jdbcConfig.optimizedExistQuery) {
s"DESCRIBE $fullTableName"
} else {
s"SELECT 1 FROM $fullTableName WHERE 0 = 1"
}
Try {
execute(query)
} match {
case Failure(ex) =>
log.info(s"The query resulted in an error, assuming the table $fullTableName does not exist" + ex.getMessage)
false
case _ =>
log.info(s"Table $fullTableName exists.")
true
}
}
@throws[SQLSyntaxErrorException]
override def execute(query: String): Unit = {
log.info(s"Executing SQL: $query")
executeActionOnConnection { conn =>
val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
try {
statement.execute(query)
} finally {
statement.close()
}
}
}
override def close(): Unit = if (connection != null) connection.close()
private[core] def executeActionOnConnection(action: Connection => Boolean): Boolean = {
val currentConnection = getConnection(forceReconnect = false)
try {
action(currentConnection)
} catch {
case ex: SQLException =>
throw ex
case NonFatal(ex) =>
log.warn(s"Got an error on existing connection. Retrying...", ex)
action(getConnection(forceReconnect = true))
}
}
def getConnection(forceReconnect: Boolean): Connection = {
if (connection == null) {
jdbcUrlSelector.logConnectionSettings()
}
if (connection == null || forceReconnect) {
val (newConnection, url) = jdbcUrlSelector.getWorkingConnection(retries)
log.info(s"Selected query executor connection: $url")
connection = newConnection
}
connection
}
}
object QueryExecutorJdbc {
def fromJdbcConfig(jdbcConfig: JdbcConfig): QueryExecutorJdbc = {
val jdbcUrlSelector = JdbcUrlSelector(jdbcConfig)
new QueryExecutorJdbc(jdbcUrlSelector)
}
}