Skip to content

Commit

Permalink
fix BatchGet stuck bug
Browse files Browse the repository at this point in the history
  • Loading branch information
marsishandsome committed Aug 11, 2020
1 parent 7bd6fb3 commit 0de8e26
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ class TiBatchWriteTable(

if (handleCol != null) {
val (batchHandle, handleMap) = genNextHandleBatch(batch)
val oldValueList = snapshot.batchGet(batchHandle)
val oldValueList = snapshot.batchGet(options.batchGetBackOfferMS, batchHandle)
(0 until oldValueList.size()).foreach { i =>
val oldValuePair = oldValueList.get(i)
val oldValue = oldValuePair.getValue
Expand All @@ -442,7 +442,7 @@ class TiBatchWriteTable(
val oldIndicesBatch: util.List[Array[Byte]] = new util.ArrayList[Array[Byte]]()
uniqueIndices.foreach { index =>
val (batchIndices, rowMap) = genNextUniqueIndexBatch(batch, index)
val oldValueList = snapshot.batchGet(batchIndices)
val oldValueList = snapshot.batchGet(options.batchGetBackOfferMS, batchIndices)
(0 until oldValueList.size()).foreach { i =>
val oldValuePair = oldValueList.get(i)
val oldValue = oldValuePair.getValue
Expand All @@ -456,7 +456,7 @@ class TiBatchWriteTable(
}
}

val oldIndicesRowPairs = snapshot.batchGet(oldIndicesBatch)
val oldIndicesRowPairs = snapshot.batchGet(options.batchGetBackOfferMS, oldIndicesBatch)
(0 until oldIndicesRowPairs.size()).foreach { i =>
val oldIndicesRowPair = oldIndicesRowPairs.get(i)
val oldRowKey = oldIndicesRowPair.getKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ class TiDBOptions(@transient val parameters: CaseInsensitiveMap[String]) extends
// ttlMode = { "FIXED", "UPDATE", "DEFAULT" }
val ttlMode: String = getOrDefault(TIDB_TTL_MODE, "DEFAULT").toUpperCase()
val useSnapshotBatchGet: Boolean = getOrDefault(TIDB_USE_SNAPSHOT_BATCH_GET, "true").toBoolean
val snapshotBatchGetSize: Int = getOrDefault(TIDB_SNAPSHOT_BATCH_GET_SIZE, "2048").toInt
//20k
val snapshotBatchGetSize: Int = getOrDefault(TIDB_SNAPSHOT_BATCH_GET_SIZE, "20480").toInt
val batchGetBackOfferMS: Int = getOrDefault(TIDB_BATCH_GET_BACKOFFER_MS, "60000").toInt
val sleepAfterPrewritePrimaryKey: Long =
getOrDefault(TIDB_SLEEP_AFTER_PREWRITE_PRIMARY_KEY, "0").toLong
val sleepAfterPrewriteSecondaryKey: Long =
Expand Down Expand Up @@ -181,6 +183,7 @@ object TiDBOptions {
val TIDB_TTL_MODE: String = newOption("ttlMode")
val TIDB_USE_SNAPSHOT_BATCH_GET: String = newOption("useSnapshotBatchGet")
val TIDB_SNAPSHOT_BATCH_GET_SIZE: String = newOption("snapshotBatchGetSize")
val TIDB_BATCH_GET_BACKOFFER_MS: String = newOption("batchGetBackOfferMS")
val TIDB_USE_TABLE_LOCK: String = newOption("useTableLock")
val TIDB_MULTI_TABLES: String = newOption("multiTables")
val TIDB_TASK_NUM_PER_REGION: String = newOption("taskNumPerRegion")
Expand Down
173 changes: 95 additions & 78 deletions tikv-client/src/main/java/com/pingcap/tikv/KVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,14 @@
import com.pingcap.tikv.util.BackOffer;
import com.pingcap.tikv.util.ConcreteBackOffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -93,25 +89,15 @@ public ByteString get(ByteString key, long version) throws GrpcException {
/**
* Get a set of key-value pair by keys from TiKV
*
* @param keys keys
* @param backOffer
* @param keys
* @param version
* @return
* @throws GrpcException
*/
public List<KvPair> batchGet(List<ByteString> keys, long version) throws GrpcException {
return batchGet(ConcreteBackOffer.newBatchGetMaxBackOff(), keys, version);
}

private List<KvPair> batchGet(BackOffer backOffer, List<ByteString> keys, long version) {
Set<ByteString> set = new HashSet<>(keys);
return batchGet(backOffer, set, version);
}

private List<KvPair> batchGet(BackOffer backOffer, Set<ByteString> keys, long version) {
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(keys);
List<Batch> batches = new ArrayList<>();

for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(batches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE);
}
return sendBatchGet(backOffer, batches, version);
public List<KvPair> batchGet(BackOffer backOffer, List<ByteString> keys, long version)
throws GrpcException {
return doSendBatchGet(backOffer, keys, version);
}

/**
Expand Down Expand Up @@ -149,26 +135,101 @@ public List<Kvrpcpb.KvPair> scan(ByteString startKey, long version) throws GrpcE
return scan(startKey, version, Integer.MAX_VALUE);
}

private List<KvPair> doSendBatchGet(BackOffer backOffer, List<ByteString> keys, long version) {
ExecutorCompletionService<List<KvPair>> completionService =
new ExecutorCompletionService<>(executorService);

Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(keys);
List<Batch> batches = new ArrayList<>();

for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(batches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE);
}

for (Batch batch : batches) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(
() -> doSendBatchGetInBatchesWithRetry(singleBatchBackOffer, batch, version));
}

try {
List<KvPair> result = new ArrayList<>();
for (int i = 0; i < batches.size(); i++) {
result.addAll(completionService.take().get());
}
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
}

private List<KvPair> doSendBatchGetInBatchesWithRetry(
BackOffer backOffer, Batch batch, long version) {
TiRegion oldRegion = batch.region;
TiRegion currentRegion =
clientBuilder.getRegionManager().getRegionById(backOffer, oldRegion.getId());

if (oldRegion.equals(currentRegion)) {
RegionStoreClient client = clientBuilder.build(batch.region);
try {
return client.batchGet(backOffer, batch.keys, version);
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(batch.region.getId());
logger.warn("ReSplitting ranges for BatchGetRequest", e);

// retry
return doSendBatchGetWithRefetchRegion(backOffer, batch, version);
}
} else {
return doSendBatchGetWithRefetchRegion(backOffer, batch, version);
}
}

private List<KvPair> doSendBatchGetWithRefetchRegion(
BackOffer backOffer, Batch batch, long version) {
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(batch.keys);
List<Batch> retryBatches = new ArrayList<>();

for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(retryBatches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE);
}

ArrayList<KvPair> results = new ArrayList<>();
for (Batch retryBatch : retryBatches) {
// recursive calls
List<KvPair> batchResult = doSendBatchGetInBatchesWithRetry(backOffer, retryBatch, version);
results.addAll(batchResult);
}
return results;
}

/**
* Append batch to list and split them according to batch limit
*
* @param batches a grouped batch
* @param region region
* @param keys keys
* @param limit batch max limit
* @param batchGetMaxSizeInByte batch max limit
*/
private void appendBatches(
List<Batch> batches, TiRegion region, List<ByteString> keys, int limit) {
List<ByteString> tmpKeys = new ArrayList<>();
for (int i = 0; i < keys.size(); i++) {
if (i >= limit) {
batches.add(new Batch(region, tmpKeys));
tmpKeys.clear();
}
tmpKeys.add(keys.get(i));
List<Batch> batches, TiRegion region, List<ByteString> keys, int batchGetMaxSizeInByte) {
int start;
int end;
if (keys == null) {
return;
}
if (!tmpKeys.isEmpty()) {
batches.add(new Batch(region, tmpKeys));
int len = keys.size();
for (start = 0; start < len; start = end) {
int size = 0;
for (end = start; end < len && size < batchGetMaxSizeInByte; end++) {
size += keys.get(end).size();
}
Batch batch = new Batch(region, keys.subList(start, end));
batches.add(batch);
}
}

Expand All @@ -178,55 +239,11 @@ private void appendBatches(
* @param keys keys
* @return a mapping of keys and their region
*/
private Map<TiRegion, List<ByteString>> groupKeysByRegion(Set<ByteString> keys) {
private Map<TiRegion, List<ByteString>> groupKeysByRegion(List<ByteString> keys) {
return keys.stream()
.collect(Collectors.groupingBy(clientBuilder.getRegionManager()::getRegionByKey));
}

/**
* Send batchPut request concurrently
*
* @param backOffer current backOffer
* @param batches list of batch to send
*/
private List<KvPair> sendBatchGet(BackOffer backOffer, List<Batch> batches, long version) {
ExecutorCompletionService<List<KvPair>> completionService =
new ExecutorCompletionService<>(executorService);
for (Batch batch : batches) {
completionService.submit(
() -> {
RegionStoreClient client = clientBuilder.build(batch.region);
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
List<ByteString> keys = batch.keys;
try {
return client.batchGet(singleBatchBackOffer, keys, version);
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?
singleBatchBackOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(batch.region.getId());
logger.warn("ReSplitting ranges for BatchGetRequest");
// recursive calls
return batchGet(singleBatchBackOffer, batch.keys, version);
}
});
}
try {
List<KvPair> result = new ArrayList<>();
for (int i = 0; i < batches.size(); i++) {
result.addAll(
completionService.take().get(BackOffer.BATCH_GET_MAX_BACKOFF, TimeUnit.SECONDS));
}
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (TimeoutException e) {
throw new TiKVException("TimeOut Exceeded for current operation. ", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
}

private Iterator<Kvrpcpb.KvPair> scanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
Expand Down
5 changes: 3 additions & 2 deletions tikv-client/src/main/java/com/pingcap/tikv/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.pingcap.tikv.operation.iterator.ConcreteScanIterator;
import com.pingcap.tikv.operation.iterator.IndexScanIterator;
import com.pingcap.tikv.row.Row;
import com.pingcap.tikv.util.ConcreteBackOffer;
import com.pingcap.tikv.util.RangeSplitter;
import com.pingcap.tikv.util.RangeSplitter.RegionTask;
import java.util.ArrayList;
Expand Down Expand Up @@ -71,13 +72,13 @@ public ByteString get(ByteString key) {
}
}

public List<BytePairWrapper> batchGet(List<byte[]> keys) {
public List<BytePairWrapper> batchGet(int backOffer, List<byte[]> keys) {
List<ByteString> list = new ArrayList<>();
for (byte[] key : keys) {
list.add(ByteString.copyFrom(key));
}
try (KVClient client = new KVClient(session.getConf(), session.getRegionStoreClientBuilder())) {
List<KvPair> kvPairList = client.batchGet(list, timestamp.getVersion());
List<KvPair> kvPairList = client.batchGet(ConcreteBackOffer.newCustomBackOff(backOffer), list, timestamp.getVersion());
return kvPairList
.stream()
.map(
Expand Down

0 comments on commit 0de8e26

Please sign in to comment.