From 081c54ddda8e863ad4f782552a692125232eeaa7 Mon Sep 17 00:00:00 2001 From: zhangyang21 Date: Fri, 21 Jan 2022 11:31:30 +0800 Subject: [PATCH] [ISSUE #85] fix preferred leader bug Signed-off-by: zhangyang21 --- .../openmessaging/storage/dledger/DLedgerConfig.java | 9 +++++++++ .../openmessaging/storage/dledger/DLedgerServer.java | 10 +++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java b/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java index 15b68d61..81e58966 100644 --- a/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java +++ b/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java @@ -88,6 +88,7 @@ public class DLedgerConfig { private boolean isEnableBatchPush = false; private int maxBatchPushSize = 4 * 1024; + private long leadershipTransferWaitTimeout = 1000; public String getDefaultPath() { return storeBaseDir + File.separator + "dledger-" + selfId; @@ -398,4 +399,12 @@ public int getMaxBatchPushSize() { public void setMaxBatchPushSize(int maxBatchPushSize) { this.maxBatchPushSize = maxBatchPushSize; } + + public long getLeadershipTransferWaitTimeout() { + return leadershipTransferWaitTimeout; + } + + public void setLeadershipTransferWaitTimeout(long leadershipTransferWaitTimeout) { + this.leadershipTransferWaitTimeout = leadershipTransferWaitTimeout; + } } diff --git a/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java b/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java index 24e16514..5bcc6855 100644 --- a/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java +++ b/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java @@ -296,7 +296,15 @@ public CompletableFuture handleLeadershipTransfer(Le } else if (memberState.getSelfId().equals(request.getTransfereeId())) { // It's the transferee received the take leadership command. PreConditions.check(request.getTransferId().equals(memberState.getLeaderId()), DLedgerResponseCode.INCONSISTENT_LEADER, "transfer=%s is not leader", request.getTransferId()); - + long startTime = System.currentTimeMillis(); + long fallBehind; + while ((fallBehind = request.getTakeLeadershipLedgerIndex() - memberState.getLedgerEndIndex()) > 0) { + if (System.currentTimeMillis() - startTime > dLedgerConfig.getLeadershipTransferWaitTimeout()) { + throw new DLedgerException(DLedgerResponseCode.TAKE_LEADERSHIP_FAILED, "transferee fall behind, wait timeout. timeout={}, diff={}", dLedgerConfig.getLeadershipTransferWaitTimeout(), fallBehind); + } + logger.warn("transferee fall behind, wait 1 ms, diff={}.", fallBehind); + Thread.sleep(1); + } return dLedgerLeaderElector.handleTakeLeadership(request); } else { return CompletableFuture.completedFuture(new LeadershipTransferResponse().term(memberState.currTerm()).code(DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));