From 42d553619d20a3763ce06d465ba011f9dc1752d7 Mon Sep 17 00:00:00 2001 From: qiujiayu <153163285@qq.com> Date: Mon, 12 Jun 2023 11:13:11 +0800 Subject: [PATCH] add group id for replicator logs --- .../alipay/sofa/jraft/core/Replicator.java | 63 ++++++++++--------- .../sofa/jraft/core/ReplicatorGroupImpl.java | 10 +-- .../com/alipay/sofa/jraft/util/ThreadId.java | 4 ++ .../sofa/jraft/core/ReplicatorTest.java | 17 ++--- 4 files changed, 55 insertions(+), 39 deletions(-) diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java index b40c323a3..f44543826 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java @@ -203,6 +203,10 @@ public Map getMetrics() { gauges.put("probe-times", (Gauge) () -> this.r.probeCounter); gauges.put("block-times", (Gauge) () -> this.r.blockCounter); gauges.put("append-entries-times", (Gauge) () -> this.r.appendEntriesCounter); + gauges.put("consecutive-error-times", (Gauge) () -> (long) this.r.consecutiveErrorTimes); + gauges.put("state", (Gauge) () -> (long) this.r.state.ordinal()); + gauges.put("running-state", (Gauge) () -> (long) this.r.statInfo.runningState.ordinal()); + gauges.put("locked", (Gauge) () -> (null == this.r.id ? -1L : this.r.id.isLocked() ? 1L : 0L)); return gauges; } } @@ -242,7 +246,7 @@ public interface ReplicatorStateListener { * @author boyan(boyan@antfin.com) * */ - enum ReplicatorState{ + enum ReplicatorState { /** * The replicator is created. */ @@ -610,27 +614,28 @@ private void startHeartbeatTimer(final long startMs) { this.heartbeatTimer = this.timerManager.schedule(() -> onTimeout(this.id), dueTime - Utils.nowMs(), TimeUnit.MILLISECONDS); } catch (final Exception e) { - LOG.error("Fail to schedule heartbeat timer", e); + LOG.error("Replicator {} fail to schedule heartbeat timer ", this, e); onTimeout(this.id); } } void installSnapshot() { if (getState() == State.Snapshot) { - LOG.warn("Replicator {} is installing snapshot, ignore the new request.", this.options.getPeerId()); + LOG.warn("Replicator {} is installing snapshot, ignore the new request.", this); unlockId(); return; } boolean doUnlock = true; if (!this.rpcService.connect(this.options.getPeerId().getEndpoint())) { - LOG.error("Fail to check install snapshot connection to peer={}, give up to send install snapshot request.", this.options.getPeerId().getEndpoint()); + LOG.error("Fail to check install snapshot connection to peer={}, give up to send install snapshot request, group id {}.", + this.options.getPeerId().getEndpoint(), this.options.getGroupId()); block(Utils.nowMs(), RaftError.EHOSTDOWN.getNumber()); return; } try { Requires.requireTrue(this.reader == null, - "Replicator %s already has a snapshot reader, current state is %s", this.options.getPeerId(), - getState()); + "Replicator [%s, %s, %s] already has a snapshot reader, current state is %s", + this.options.getGroupId(), this.options.getPeerId(), this.options.getReplicatorType(), getState()); this.reader = this.options.getSnapshotStorage().open(); if (this.reader == null) { final NodeImpl node = this.options.getNode(); @@ -718,7 +723,7 @@ static boolean onInstallSnapshotReturned(final ThreadId id, final Replicator r, LOG.info(sb.toString()); notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status); if ((r.consecutiveErrorTimes++) % 10 == 0) { - LOG.warn("Fail to install snapshot at peer={}, error={}", r.options.getPeerId(), status); + LOG.warn("Fail to install snapshot at peer={}, error={}, group id={}", r.options.getPeerId(), status, r.options.getGroupId()); } success = false; break; @@ -771,7 +776,7 @@ private void sendEmptyEntries(final boolean isHeartbeat, installSnapshot(); if (isHeartbeat && heartBeatClosure != null) { RpcUtils.runClosureInThread(heartBeatClosure, new Status(RaftError.EAGAIN, - "Fail to send heartbeat to peer %s", this.options.getPeerId())); + "Fail to send heartbeat to peer %s, group %s", this.options.getPeerId(), this.options.getGroupId())); } return; } @@ -888,7 +893,7 @@ public static ThreadId start(final ReplicatorOptions opts, final RaftOptions raf } final Replicator r = new Replicator(opts, raftOptions); if (!r.rpcService.connect(opts.getPeerId().getEndpoint())) { - LOG.error("Fail to init sending channel to {}.", opts.getPeerId()); + LOG.error("Fail to init sending channel to {}, group: {}.", opts.getPeerId(), opts.getGroupId()); // Return and it will be retried later. return null; } @@ -909,7 +914,7 @@ public static ThreadId start(final ReplicatorOptions opts, final RaftOptions raf r.id = new ThreadId(r, r); r.id.lock(); notifyReplicatorStatusListener(r, ReplicatorEvent.CREATED); - LOG.info("Replicator={}@{} is started", r.id, r.options.getPeerId()); + LOG.info("Replicator [group: {}, peer: {}, type: {}] is started", r.options.getGroupId(), r.options.getPeerId(), r.options.getReplicatorType()); r.catchUpClosure = null; r.lastRpcSendTimestamp = Utils.monotonicMs(); r.startHeartbeatTimer(Utils.nowMs()); @@ -932,7 +937,7 @@ public static void waitForCaughtUp(final String groupId, final ThreadId id, fina } try { if (r.catchUpClosure != null) { - LOG.error("Previous wait_for_caught_up is not over"); + LOG.error("Previous wait_for_caught_up is not over, peer: {}, group id: {}", r.options.getPeerId(), r.options.getGroupId()); ThreadPoolsFactory.runClosureInThread(groupId, done, new Status(RaftError.EINVAL, "Duplicated call")); return; } @@ -950,7 +955,8 @@ public static void waitForCaughtUp(final String groupId, final ThreadId id, fina @Override public String toString() { return "Replicator [state=" + getState() + ", statInfo=" + this.statInfo + ", peerId=" - + this.options.getPeerId() + ", waitId=" + this.waitId + ", type=" + this.options.getReplicatorType() + "]"; + + this.options.getPeerId() + ", groupId=" + this.options.getGroupId() + ", waitId=" + this.waitId + + ", type=" + this.options.getReplicatorType() + "]"; } static void onBlockTimeoutInNewThread(final ThreadId id) { @@ -1036,7 +1042,7 @@ void block(final long startTimeMs, @SuppressWarnings("unused") final int errorCo unlockId(); } catch (final Exception e) { this.blockTimer = null; - LOG.error("Fail to add timer", e); + LOG.error("Fail to add timer for replicator {}", this, e); // id unlock in sendEmptyEntries. sendProbeRequest(); } @@ -1083,7 +1089,7 @@ public void onError(final ThreadId id, final Object data, final int errorCode) { RpcUtils.runInThread(() -> sendHeartbeat(id)); } else { // noinspection ConstantConditions - Requires.requireTrue(false, "Unknown error code for replicator: " + errorCode); + Requires.requireTrue(false, "Unknown error code " + errorCode + " for replicator: " + r); } } @@ -1133,11 +1139,11 @@ private void notifyOnCaughtUp(final int code, final boolean beforeDestroy) { RpcUtils.runClosureInThread(savedClosure, savedClosure.getStatus()); } - private static void onTimeout(final ThreadId id) { + private void onTimeout(final ThreadId id) { if (id != null) { id.setError(RaftError.ETIMEDOUT.getNumber()); } else { - LOG.warn("Replicator id is null when timeout, maybe it's destroyed."); + LOG.warn("Replicator {} id is null when timeout, maybe it's destroyed.", this); } } @@ -1199,8 +1205,8 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap r.setState(State.Probe); notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status); if ((r.consecutiveErrorTimes++) % 10 == 0) { - LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(), - r.consecutiveErrorTimes, status); + LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}, groupId={}", r.options.getPeerId(), + r.consecutiveErrorTimes, status, r.options.getGroupId()); } r.startHeartbeatTimer(startTimeMs); return; @@ -1218,7 +1224,7 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true); r.destroy(); node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE, - "Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId())); + "Leader receives higher term heartbeat_response from peer:%s, group:%s", r.options.getPeerId(), r.options.getGroupId())); return; } if (!response.getSuccess() && response.hasLastLogIndex()) { @@ -1229,7 +1235,7 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap .append(response.getLastLogIndex()); LOG.debug(sb.toString()); } - LOG.warn("Heartbeat to peer {} failure, try to send a probe request.", r.options.getPeerId()); + LOG.warn("Heartbeat to peer {} failure, try to send a probe request, groupId={}.", r.options.getPeerId(), r.options.getGroupId()); doUnlock = false; r.sendProbeRequest(); r.startHeartbeatTimer(startTimeMs); @@ -1274,7 +1280,7 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St if (holdingQueue.size() > r.raftOptions.getMaxReplicatorInflightMsgs()) { LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}", - holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs()); + holdingQueue.size(), r, r.raftOptions.getMaxReplicatorInflightMsgs()); r.resetInflights(); r.setState(State.Probe); r.sendProbeRequest(); @@ -1432,8 +1438,8 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight } notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status); if ((r.consecutiveErrorTimes++) % 10 == 0) { - LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(), - r.consecutiveErrorTimes, status); + LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}, groupId={}", r.options.getPeerId(), + r.consecutiveErrorTimes, status, r.options.getGroupId()); } r.resetInflights(); r.setState(State.Probe); @@ -1469,7 +1475,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true); r.destroy(); node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE, - "Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId())); + "Leader receives higher term heartbeat_response from peer:%s, group:%s", r.options.getPeerId(), r.options.getGroupId())); return false; } if (isLogDebugEnabled) { @@ -1494,8 +1500,8 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight LOG.debug("logIndex={} dismatch", r.nextIndex); r.nextIndex--; } else { - LOG.error("Peer={} declares that log at index=0 doesn't match, which is not supposed to happen", - r.options.getPeerId()); + LOG.error("Peer={} declares that log at index=0 doesn't match, which is not supposed to happen, groupId={}", + r.options.getPeerId(), r.options.getGroupId()); } } // dummy_id is unlock in _send_heartbeat @@ -1510,7 +1516,8 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight if (response.getTerm() != r.options.getTerm()) { r.resetInflights(); r.setState(State.Probe); - LOG.error("Fail, response term {} dismatch, expect term {}", response.getTerm(), r.options.getTerm()); + LOG.error("Fail, response term {} dismatch, expect term {}, peer {}, group {}", response.getTerm(), + r.options.getTerm(), r.options.getPeerId(), r.options.getGroupId()); id.unlock(); return false; } @@ -1803,7 +1810,7 @@ static void onTimeoutNowReturned(final ThreadId id, final Status status, final T r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true); r.destroy(); node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE, - "Leader receives higher term timeout_now_response from peer:%s", r.options.getPeerId())); + "Leader receives higher term timeout_now_response from peer:%s, group:%s", r.options.getPeerId(), r.options.getGroupId())); return; } if (stopAfterFinish) { diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java index 349444c8c..e111c2f14 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java @@ -122,14 +122,16 @@ public boolean addReplicator(final PeerId peer, final ReplicatorType replicatorT if (!sync) { final RaftClientService client = opts.getRaftRpcService(); if (client != null && !client.checkConnection(peer.getEndpoint(), true)) { - LOG.error("Fail to check replicator connection to peer={}, replicatorType={}.", peer, replicatorType); + LOG.error("Fail to check replicator connection to peer={}, replicatorType={}, groupId={}.", peer, + replicatorType, this.commonOptions.getGroupId()); this.failureReplicators.put(peer, replicatorType); return false; } } final ThreadId rid = Replicator.start(opts, this.raftOptions); if (rid == null) { - LOG.error("Fail to start replicator to peer={}, replicatorType={}.", peer, replicatorType); + LOG.error("Fail to start replicator to peer={}, replicatorType={}, groupId={}.", peer, replicatorType, + this.commonOptions.getGroupId()); this.failureReplicators.put(peer, replicatorType); return false; } @@ -199,7 +201,7 @@ public void checkReplicator(final PeerId peer, final boolean lockNode) { @Override public boolean stopReplicator(final PeerId peer) { - LOG.info("Stop replicator to {}.", peer); + LOG.info("Stop replicator to {}, group id {}.", peer, this.commonOptions.getGroupId()); this.failureReplicators.remove(peer); final ThreadId rid = this.replicatorMap.remove(peer); if (rid == null) { @@ -255,7 +257,7 @@ public ThreadId stopAllAndFindTheNextCandidate(final ConfigurationEntry conf) { if (candidateId != null) { candidate = this.replicatorMap.get(candidateId); } else { - LOG.info("Fail to find the next candidate."); + LOG.info("Fail to find the next candidate, group {}.", this.commonOptions.getGroupId()); } for (final ThreadId r : this.replicatorMap.values()) { if (r != candidate) { diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ThreadId.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ThreadId.java index e63a1554f..6ee10d5d1 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ThreadId.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ThreadId.java @@ -132,4 +132,8 @@ public void setError(final int errorCode) { } } } + + public boolean isLocked() { + return this.lock.isLocked(); + } } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java index 552fef76a..5ac016ab4 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java @@ -170,7 +170,7 @@ public void testMetricRemoveOnDestroy() { assertNotNull(r); assertSame(r.getOpts(), this.opts); Set metrics = this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames(); - assertEquals(8, metrics.size()); + assertEquals(12, metrics.size()); r.destroy(); metrics = this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames(); assertEquals(0, metrics.size()); @@ -248,8 +248,9 @@ public void testOnRpcReturnedTermMismatch() { Utils.monotonicMs()); Mockito.verify(this.node).increaseTermTo( 2, - new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term heartbeat_response from peer:%s", - this.peerId)); + new Status(RaftError.EHIGHERTERMRESPONSE, + "Leader receives higher term heartbeat_response from peer:%s, group:%s", this.peerId, this.node + .getGroupId())); assertNull(r.id); } @@ -504,8 +505,9 @@ public void testOnHeartbeatReturnedTermMismatch() { Replicator.onHeartbeatReturned(this.id, Status.OK(), request, response, Utils.monotonicMs()); Mockito.verify(this.node).increaseTermTo( 2, - new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term heartbeat_response from peer:%s", - this.peerId)); + new Status(RaftError.EHIGHERTERMRESPONSE, + "Leader receives higher term heartbeat_response from peer:%s, group:%s", this.peerId, this.node + .getGroupId())); assertNull(r.id); } @@ -674,8 +676,9 @@ public void testOnTimeoutNowReturnedTermMismatch() { Replicator.onTimeoutNowReturned(this.id, Status.OK(), request, response, false); Mockito.verify(this.node).increaseTermTo( 12, - new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term timeout_now_response from peer:%s", - this.peerId)); + new Status(RaftError.EHIGHERTERMRESPONSE, + "Leader receives higher term timeout_now_response from peer:%s, group:%s", this.peerId, this.node + .getGroupId())); assertNull(r.id); }