Skip to content

Commit

Permalink
add group id for replicator logs (#997)
Browse files Browse the repository at this point in the history
  • Loading branch information
qiujiayu authored Jun 13, 2023
1 parent 3d989e3 commit 19ed179
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 39 deletions.
63 changes: 35 additions & 28 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ public Map<String, Metric> getMetrics() {
gauges.put("probe-times", (Gauge<Long>) () -> this.r.probeCounter);
gauges.put("block-times", (Gauge<Long>) () -> this.r.blockCounter);
gauges.put("append-entries-times", (Gauge<Long>) () -> this.r.appendEntriesCounter);
gauges.put("consecutive-error-times", (Gauge<Long>) () -> (long) this.r.consecutiveErrorTimes);
gauges.put("state", (Gauge<Long>) () -> (long) this.r.state.ordinal());
gauges.put("running-state", (Gauge<Long>) () -> (long) this.r.statInfo.runningState.ordinal());
gauges.put("locked", (Gauge<Long>) () -> (null == this.r.id ? -1L : this.r.id.isLocked() ? 1L : 0L));
return gauges;
}
}
Expand Down Expand Up @@ -242,7 +246,7 @@ public interface ReplicatorStateListener {
* @author boyan(boyan@antfin.com)
*
*/
enum ReplicatorState{
enum ReplicatorState {
/**
* The replicator is created.
*/
Expand Down Expand Up @@ -610,27 +614,28 @@ private void startHeartbeatTimer(final long startMs) {
this.heartbeatTimer = this.timerManager.schedule(() -> onTimeout(this.id), dueTime - Utils.nowMs(),
TimeUnit.MILLISECONDS);
} catch (final Exception e) {
LOG.error("Fail to schedule heartbeat timer", e);
LOG.error("Replicator {} fail to schedule heartbeat timer ", this, e);
onTimeout(this.id);
}
}

void installSnapshot() {
if (getState() == State.Snapshot) {
LOG.warn("Replicator {} is installing snapshot, ignore the new request.", this.options.getPeerId());
LOG.warn("Replicator {} is installing snapshot, ignore the new request.", this);
unlockId();
return;
}
boolean doUnlock = true;
if (!this.rpcService.connect(this.options.getPeerId().getEndpoint())) {
LOG.error("Fail to check install snapshot connection to peer={}, give up to send install snapshot request.", this.options.getPeerId().getEndpoint());
LOG.error("Fail to check install snapshot connection to peer={}, give up to send install snapshot request, group id {}.",
this.options.getPeerId().getEndpoint(), this.options.getGroupId());
block(Utils.nowMs(), RaftError.EHOSTDOWN.getNumber());
return;
}
try {
Requires.requireTrue(this.reader == null,
"Replicator %s already has a snapshot reader, current state is %s", this.options.getPeerId(),
getState());
"Replicator [%s, %s, %s] already has a snapshot reader, current state is %s",
this.options.getGroupId(), this.options.getPeerId(), this.options.getReplicatorType(), getState());
this.reader = this.options.getSnapshotStorage().open();
if (this.reader == null) {
final NodeImpl node = this.options.getNode();
Expand Down Expand Up @@ -718,7 +723,7 @@ static boolean onInstallSnapshotReturned(final ThreadId id, final Replicator r,
LOG.info(sb.toString());
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if ((r.consecutiveErrorTimes++) % 10 == 0) {
LOG.warn("Fail to install snapshot at peer={}, error={}", r.options.getPeerId(), status);
LOG.warn("Fail to install snapshot at peer={}, error={}, group id={}", r.options.getPeerId(), status, r.options.getGroupId());
}
success = false;
break;
Expand Down Expand Up @@ -771,7 +776,7 @@ private void sendEmptyEntries(final boolean isHeartbeat,
installSnapshot();
if (isHeartbeat && heartBeatClosure != null) {
RpcUtils.runClosureInThread(heartBeatClosure, new Status(RaftError.EAGAIN,
"Fail to send heartbeat to peer %s", this.options.getPeerId()));
"Fail to send heartbeat to peer %s, group %s", this.options.getPeerId(), this.options.getGroupId()));
}
return;
}
Expand Down Expand Up @@ -888,7 +893,7 @@ public static ThreadId start(final ReplicatorOptions opts, final RaftOptions raf
}
final Replicator r = new Replicator(opts, raftOptions);
if (!r.rpcService.connect(opts.getPeerId().getEndpoint())) {
LOG.error("Fail to init sending channel to {}.", opts.getPeerId());
LOG.error("Fail to init sending channel to {}, group: {}.", opts.getPeerId(), opts.getGroupId());
// Return and it will be retried later.
return null;
}
Expand All @@ -909,7 +914,7 @@ public static ThreadId start(final ReplicatorOptions opts, final RaftOptions raf
r.id = new ThreadId(r, r);
r.id.lock();
notifyReplicatorStatusListener(r, ReplicatorEvent.CREATED);
LOG.info("Replicator={}@{} is started", r.id, r.options.getPeerId());
LOG.info("Replicator [group: {}, peer: {}, type: {}] is started", r.options.getGroupId(), r.options.getPeerId(), r.options.getReplicatorType());
r.catchUpClosure = null;
r.lastRpcSendTimestamp = Utils.monotonicMs();
r.startHeartbeatTimer(Utils.nowMs());
Expand All @@ -932,7 +937,7 @@ public static void waitForCaughtUp(final String groupId, final ThreadId id, fina
}
try {
if (r.catchUpClosure != null) {
LOG.error("Previous wait_for_caught_up is not over");
LOG.error("Previous wait_for_caught_up is not over, peer: {}, group id: {}", r.options.getPeerId(), r.options.getGroupId());
ThreadPoolsFactory.runClosureInThread(groupId, done, new Status(RaftError.EINVAL, "Duplicated call"));
return;
}
Expand All @@ -950,7 +955,8 @@ public static void waitForCaughtUp(final String groupId, final ThreadId id, fina
@Override
public String toString() {
return "Replicator [state=" + getState() + ", statInfo=" + this.statInfo + ", peerId="
+ this.options.getPeerId() + ", waitId=" + this.waitId + ", type=" + this.options.getReplicatorType() + "]";
+ this.options.getPeerId() + ", groupId=" + this.options.getGroupId() + ", waitId=" + this.waitId
+ ", type=" + this.options.getReplicatorType() + "]";
}

static void onBlockTimeoutInNewThread(final ThreadId id) {
Expand Down Expand Up @@ -1036,7 +1042,7 @@ void block(final long startTimeMs, @SuppressWarnings("unused") final int errorCo
unlockId();
} catch (final Exception e) {
this.blockTimer = null;
LOG.error("Fail to add timer", e);
LOG.error("Fail to add timer for replicator {}", this, e);
// id unlock in sendEmptyEntries.
sendProbeRequest();
}
Expand Down Expand Up @@ -1083,7 +1089,7 @@ public void onError(final ThreadId id, final Object data, final int errorCode) {
RpcUtils.runInThread(() -> sendHeartbeat(id));
} else {
// noinspection ConstantConditions
Requires.requireTrue(false, "Unknown error code for replicator: " + errorCode);
Requires.requireTrue(false, "Unknown error code " + errorCode + " for replicator: " + r);
}
}

Expand Down Expand Up @@ -1133,11 +1139,11 @@ private void notifyOnCaughtUp(final int code, final boolean beforeDestroy) {
RpcUtils.runClosureInThread(savedClosure, savedClosure.getStatus());
}

private static void onTimeout(final ThreadId id) {
private void onTimeout(final ThreadId id) {
if (id != null) {
id.setError(RaftError.ETIMEDOUT.getNumber());
} else {
LOG.warn("Replicator id is null when timeout, maybe it's destroyed.");
LOG.warn("Replicator {} id is null when timeout, maybe it's destroyed.", this);
}
}

Expand Down Expand Up @@ -1199,8 +1205,8 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap
r.setState(State.Probe);
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if ((r.consecutiveErrorTimes++) % 10 == 0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
r.consecutiveErrorTimes, status);
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}, groupId={}", r.options.getPeerId(),
r.consecutiveErrorTimes, status, r.options.getGroupId());
}
r.startHeartbeatTimer(startTimeMs);
return;
Expand All @@ -1218,7 +1224,7 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap
r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
r.destroy();
node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE,
"Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId()));
"Leader receives higher term heartbeat_response from peer:%s, group:%s", r.options.getPeerId(), r.options.getGroupId()));
return;
}
if (!response.getSuccess() && response.hasLastLogIndex()) {
Expand All @@ -1229,7 +1235,7 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap
.append(response.getLastLogIndex());
LOG.debug(sb.toString());
}
LOG.warn("Heartbeat to peer {} failure, try to send a probe request.", r.options.getPeerId());
LOG.warn("Heartbeat to peer {} failure, try to send a probe request, groupId={}.", r.options.getPeerId(), r.options.getGroupId());
doUnlock = false;
r.sendProbeRequest();
r.startHeartbeatTimer(startTimeMs);
Expand Down Expand Up @@ -1274,7 +1280,7 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St

if (holdingQueue.size() > r.raftOptions.getMaxReplicatorInflightMsgs()) {
LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}",
holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs());
holdingQueue.size(), r, r.raftOptions.getMaxReplicatorInflightMsgs());
r.resetInflights();
r.setState(State.Probe);
r.sendProbeRequest();
Expand Down Expand Up @@ -1432,8 +1438,8 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
}
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if ((r.consecutiveErrorTimes++) % 10 == 0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
r.consecutiveErrorTimes, status);
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}, groupId={}", r.options.getPeerId(),
r.consecutiveErrorTimes, status, r.options.getGroupId());
}
r.resetInflights();
r.setState(State.Probe);
Expand Down Expand Up @@ -1469,7 +1475,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
r.destroy();
node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE,
"Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId()));
"Leader receives higher term heartbeat_response from peer:%s, group:%s", r.options.getPeerId(), r.options.getGroupId()));
return false;
}
if (isLogDebugEnabled) {
Expand All @@ -1494,8 +1500,8 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
LOG.debug("logIndex={} dismatch", r.nextIndex);
r.nextIndex--;
} else {
LOG.error("Peer={} declares that log at index=0 doesn't match, which is not supposed to happen",
r.options.getPeerId());
LOG.error("Peer={} declares that log at index=0 doesn't match, which is not supposed to happen, groupId={}",
r.options.getPeerId(), r.options.getGroupId());
}
}
// dummy_id is unlock in _send_heartbeat
Expand All @@ -1510,7 +1516,8 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
if (response.getTerm() != r.options.getTerm()) {
r.resetInflights();
r.setState(State.Probe);
LOG.error("Fail, response term {} dismatch, expect term {}", response.getTerm(), r.options.getTerm());
LOG.error("Fail, response term {} dismatch, expect term {}, peer {}, group {}", response.getTerm(),
r.options.getTerm(), r.options.getPeerId(), r.options.getGroupId());
id.unlock();
return false;
}
Expand Down Expand Up @@ -1803,7 +1810,7 @@ static void onTimeoutNowReturned(final ThreadId id, final Status status, final T
r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
r.destroy();
node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE,
"Leader receives higher term timeout_now_response from peer:%s", r.options.getPeerId()));
"Leader receives higher term timeout_now_response from peer:%s, group:%s", r.options.getPeerId(), r.options.getGroupId()));
return;
}
if (stopAfterFinish) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,16 @@ public boolean addReplicator(final PeerId peer, final ReplicatorType replicatorT
if (!sync) {
final RaftClientService client = opts.getRaftRpcService();
if (client != null && !client.checkConnection(peer.getEndpoint(), true)) {
LOG.error("Fail to check replicator connection to peer={}, replicatorType={}.", peer, replicatorType);
LOG.error("Fail to check replicator connection to peer={}, replicatorType={}, groupId={}.", peer,
replicatorType, this.commonOptions.getGroupId());
this.failureReplicators.put(peer, replicatorType);
return false;
}
}
final ThreadId rid = Replicator.start(opts, this.raftOptions);
if (rid == null) {
LOG.error("Fail to start replicator to peer={}, replicatorType={}.", peer, replicatorType);
LOG.error("Fail to start replicator to peer={}, replicatorType={}, groupId={}.", peer, replicatorType,
this.commonOptions.getGroupId());
this.failureReplicators.put(peer, replicatorType);
return false;
}
Expand Down Expand Up @@ -199,7 +201,7 @@ public void checkReplicator(final PeerId peer, final boolean lockNode) {

@Override
public boolean stopReplicator(final PeerId peer) {
LOG.info("Stop replicator to {}.", peer);
LOG.info("Stop replicator to {}, group id {}.", peer, this.commonOptions.getGroupId());
this.failureReplicators.remove(peer);
final ThreadId rid = this.replicatorMap.remove(peer);
if (rid == null) {
Expand Down Expand Up @@ -255,7 +257,7 @@ public ThreadId stopAllAndFindTheNextCandidate(final ConfigurationEntry conf) {
if (candidateId != null) {
candidate = this.replicatorMap.get(candidateId);
} else {
LOG.info("Fail to find the next candidate.");
LOG.info("Fail to find the next candidate, group {}.", this.commonOptions.getGroupId());
}
for (final ThreadId r : this.replicatorMap.values()) {
if (r != candidate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,8 @@ public void setError(final int errorCode) {
}
}
}

public boolean isLocked() {
return this.lock.isLocked();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void testMetricRemoveOnDestroy() {
assertNotNull(r);
assertSame(r.getOpts(), this.opts);
Set<String> metrics = this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames();
assertEquals(8, metrics.size());
assertEquals(12, metrics.size());
r.destroy();
metrics = this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames();
assertEquals(0, metrics.size());
Expand Down Expand Up @@ -248,8 +248,9 @@ public void testOnRpcReturnedTermMismatch() {
Utils.monotonicMs());
Mockito.verify(this.node).increaseTermTo(
2,
new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term heartbeat_response from peer:%s",
this.peerId));
new Status(RaftError.EHIGHERTERMRESPONSE,
"Leader receives higher term heartbeat_response from peer:%s, group:%s", this.peerId, this.node
.getGroupId()));
assertNull(r.id);
}

Expand Down Expand Up @@ -504,8 +505,9 @@ public void testOnHeartbeatReturnedTermMismatch() {
Replicator.onHeartbeatReturned(this.id, Status.OK(), request, response, Utils.monotonicMs());
Mockito.verify(this.node).increaseTermTo(
2,
new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term heartbeat_response from peer:%s",
this.peerId));
new Status(RaftError.EHIGHERTERMRESPONSE,
"Leader receives higher term heartbeat_response from peer:%s, group:%s", this.peerId, this.node
.getGroupId()));
assertNull(r.id);
}

Expand Down Expand Up @@ -674,8 +676,9 @@ public void testOnTimeoutNowReturnedTermMismatch() {
Replicator.onTimeoutNowReturned(this.id, Status.OK(), request, response, false);
Mockito.verify(this.node).increaseTermTo(
12,
new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term timeout_now_response from peer:%s",
this.peerId));
new Status(RaftError.EHIGHERTERMRESPONSE,
"Leader receives higher term timeout_now_response from peer:%s, group:%s", this.peerId, this.node
.getGroupId()));
assertNull(r.id);
}

Expand Down

0 comments on commit 19ed179

Please sign in to comment.