Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add delay clean table lock #905

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/tidb.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ lease = "10s"
# turn off this option if there will be a large number of tables created.
split-table = true

# delay-clean-table-lock is used to control whether delayed-release the table lock in the abnormal situation. (Milliseconds)
delay-clean-table-lock = 60000

# The limit of concurrent executed sessions.
# token-limit = 1000

Expand Down
28 changes: 26 additions & 2 deletions core/src/main/scala/com/pingcap/tispark/TiBatchWrite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

object TiBatchWrite {
// Milliseconds
private val MIN_DELAY_CLEAN_TABLE_LOCK = 60000
private val DELAY_CLEAN_TABLE_LOCK_AND_COMMIT_BACKOFF_DELTA = 30000
private val PRIMARY_KEY_COMMIT_BACKOFF = MIN_DELAY_CLEAN_TABLE_LOCK - DELAY_CLEAN_TABLE_LOCK_AND_COMMIT_BACKOFF_DELTA

type SparkRow = org.apache.spark.sql.Row
type TiRow = com.pingcap.tikv.row.Row
type TiDataType = com.pingcap.tikv.types.DataType
Expand Down Expand Up @@ -146,7 +151,26 @@ class TiBatchWrite(@transient val df: DataFrame,

// lock table
tiDBJDBCClient = new TiDBJDBCClient(TiDBUtils.createConnectionFactory(options.url)())
isEnableTableLock = tiDBJDBCClient.isEnableTableLock
isEnableTableLock = {
if (tiDBJDBCClient.isEnableTableLock) {
if (tiDBJDBCClient.getDelayCleanTableLock >= MIN_DELAY_CLEAN_TABLE_LOCK) {
birdstorm marked this conversation as resolved.
Show resolved Hide resolved
true
} else {
logger.warn(
s"table lock disabled! to enable table lock, please set tidb config: delay-clean-table-lock >= $MIN_DELAY_CLEAN_TABLE_LOCK"
)
false
}
} else {
false
}
}
if (!isEnableTableLock) {
logger.warn(
s"table lock disabled! to enable table lock, please set tidb config: enable-table-lock = true"
)
}

isEnableSplitRegion = tiDBJDBCClient.isEnableSplitTable
lockTable()

Expand Down Expand Up @@ -324,7 +348,7 @@ class TiBatchWrite(@transient val df: DataFrame,
s"invalid transaction tso with startTs=$startTs, commitTs=$commitTs"
)
}
val commitPrimaryBackoff = ConcreteBackOffer.newCustomBackOff(BackOffer.BATCH_COMMIT_BACKOFF)
val commitPrimaryBackoff = ConcreteBackOffer.newCustomBackOff(PRIMARY_KEY_COMMIT_BACKOFF)
birdstorm marked this conversation as resolved.
Show resolved Hide resolved

if (connectionLost()) {
throw new TiBatchWriteException("tidb's jdbc connection is lost!")
Expand Down
16 changes: 16 additions & 0 deletions tikv-client/src/main/java/com/pingcap/tikv/TiDBJDBCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class TiDBJDBCClient implements AutoCloseable {
private static final String SELECT_TIDB_CONFIG_SQL = "select @@tidb_config";
private static final String ENABLE_TABLE_LOCK_KEY = "enable-table-lock";
private static final Boolean ENABLE_TABLE_LOCK_DEFAULT = false;
private static final String DELAY_CLEAN_TABLE_LOCK = "delay-clean-table-lock";
private static final int DELAY_CLEAN_TABLE_LOCK_DEFAULT = 0;
private static final String ENABLE_SPLIT_TABLE_KEY = "split-table";
private static final Boolean ENABLE_SPLIT_TABLE_DEFAULT = false;

Expand All @@ -45,6 +47,20 @@ public boolean isEnableTableLock() throws IOException, SQLException {
return (Boolean) enableTableLock;
}

/**
* get delay clean table lock
zhexuany marked this conversation as resolved.
Show resolved Hide resolved
*
* @return Milliseconds
* @throws IOException
* @throws SQLException
*/
public int getDelayCleanTableLock() throws IOException, SQLException {
Map<String, Object> configMap = readConfMapFromTiDB();
Object enableTableLock =
configMap.getOrDefault(DELAY_CLEAN_TABLE_LOCK, DELAY_CLEAN_TABLE_LOCK_DEFAULT);
return (int) enableTableLock;
}

public boolean lockTableWriteLocal(String databaseName, String tableName) throws SQLException {
try (Statement tidbStmt = connection.createStatement()) {
String sql = "lock tables `" + databaseName + "`.`" + tableName + "` write local";
Expand Down