diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java index 4b927b797..06c001188 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java @@ -34,6 +34,7 @@ import com.alipay.sofa.jraft.Node; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.closure.CatchUpClosure; +import com.alipay.sofa.jraft.core.Replicator.ReplicatorStateListener.ReplicatorState; import com.alipay.sofa.jraft.entity.EnumOutter; import com.alipay.sofa.jraft.entity.LogEntry; import com.alipay.sofa.jraft.entity.PeerId; @@ -145,11 +146,12 @@ private int getAndIncrementRequiredNextSeq() { } /** - * Replicator state + * Replicator internal state * @author dennis * */ public enum State { + Created, Probe, // probe follower state Snapshot, // installing snapshot to follower Replicate, // replicate logs normally @@ -164,6 +166,7 @@ public Replicator(final ReplicatorOptions replicatorOptions, final RaftOptions r this.timerManager = replicatorOptions.getTimerManager(); this.raftOptions = raftOptions; this.rpcService = replicatorOptions.getRaftRpcService(); + setState(State.Created); } /** @@ -209,7 +212,8 @@ enum RunningState { enum ReplicatorEvent { CREATED, // created ERROR, // error - DESTROYED // destroyed + DESTROYED ,// destroyed + STATE_CHANGED; // state changed. } /** @@ -222,6 +226,31 @@ enum ReplicatorEvent { */ public interface ReplicatorStateListener { + /** + * Represents state changes in the replicator. + * @author boyan(boyan@antfin.com) + * + */ + enum ReplicatorState{ + /** + * The replicator is created. + */ + CREATED, + /** + * The replicator is destroyed. + */ + DESTROYED, + /** + * The replicator begins to doing it's job(replicating logs or installing snapshot). + */ + ONLINE, + /** + * The replicaotr is suspended by raft error or lost connection. + */ + OFFLINE + } + + /** * Called when this replicator has been created. * @@ -243,6 +272,19 @@ public interface ReplicatorStateListener { * @param peer replicator related peerId */ void onDestroyed(final PeerId peer); + + /** + * Called when the replicator state is changed. See {@link ReplicatorState} + * @param peer the replicator's peer id. + * @param newState the new replicator state. + * @since 1.3.6 + */ + default void stateChanged(final PeerId peer, final ReplicatorState newState) {} + } + + private static void notifyReplicatorStatusListener(final Replicator replicator, final ReplicatorEvent event, + final Status status) { + notifyReplicatorStatusListener(replicator, event, status, null); } /** @@ -253,7 +295,7 @@ public interface ReplicatorStateListener { * @param status replicator's error detailed status */ private static void notifyReplicatorStatusListener(final Replicator replicator, final ReplicatorEvent event, - final Status status) { + final Status status, final ReplicatorState newState) { final ReplicatorOptions replicatorOpts = Requires.requireNonNull(replicator.getOpts(), "replicatorOptions"); final Node node = Requires.requireNonNull(replicatorOpts.getNode(), "node"); final PeerId peer = Requires.requireNonNull(replicatorOpts.getPeerId(), "peer"); @@ -265,7 +307,7 @@ private static void notifyReplicatorStatusListener(final Replicator replicator, try { switch (event) { case CREATED: - RpcUtils.runInThread(() -> listener.onCreated(peer)); + RpcUtils.runInThread(() -> listener.onCreated(peer)); break; case ERROR: RpcUtils.runInThread(() -> listener.onError(peer, status)); @@ -273,6 +315,8 @@ private static void notifyReplicatorStatusListener(final Replicator replicator, case DESTROYED: RpcUtils.runInThread(() -> listener.onDestroyed(peer)); break; + case STATE_CHANGED: + RpcUtils.runInThread(() -> listener.stateChanged(peer, newState)); default: break; } @@ -406,14 +450,36 @@ ArrayDeque getInflights() { return this.inflights; } - @OnlyForTest State getState() { return this.state; } - @OnlyForTest void setState(final State state) { + State oldState = this.state; this.state = state; + + if(oldState != state) { + ReplicatorState newState = null; + switch(state) { + case Created: + newState = ReplicatorState.CREATED; + break; + case Replicate: + case Snapshot: + newState = ReplicatorState.ONLINE; + break; + case Probe: + newState = ReplicatorState.OFFLINE; + break; + case Destroyed: + newState = ReplicatorState.DESTROYED; + break; + } + + if(newState != null) { + notifyReplicatorStatusListener(this, ReplicatorEvent.STATE_CHANGED, null, newState); + } + } } @OnlyForTest @@ -539,7 +605,7 @@ private void startHeartbeatTimer(final long startMs) { } void installSnapshot() { - if (this.state == State.Snapshot) { + if (getState() == State.Snapshot) { LOG.warn("Replicator {} is installing snapshot, ignore the new request.", this.options.getPeerId()); this.id.unlock(); return; @@ -548,7 +614,7 @@ void installSnapshot() { try { Requires.requireTrue(this.reader == null, "Replicator %s already has a snapshot reader, current state is %s", this.options.getPeerId(), - this.state); + getState()); this.reader = this.options.getSnapshotStorage().open(); if (this.reader == null) { final NodeImpl node = this.options.getNode(); @@ -595,7 +661,7 @@ void installSnapshot() { this.statInfo.lastTermIncluded = meta.getLastIncludedTerm(); final InstallSnapshotRequest request = rb.build(); - this.state = State.Snapshot; + setState(State.Snapshot); // noinspection NonAtomicOperationOnVolatileField this.installSnapshotCounter++; final long monotonicSendTimeMs = Utils.monotonicMs(); @@ -657,7 +723,7 @@ static boolean onInstallSnapshotReturned(final ThreadId id, final Replicator r, if (!success) { //should reset states r.resetInflights(); - r.state = State.Probe; + r.setState(State.Probe); r.block(Utils.nowMs(), status.getCode()); return false; } @@ -667,7 +733,7 @@ static boolean onInstallSnapshotReturned(final ThreadId id, final Replicator r, r.sendTimeoutNow(false, false); } // id is unlock in _send_entriesheartbeatCounter - r.state = State.Replicate; + r.setState(State.Replicate); return true; } @@ -725,7 +791,7 @@ public void run(final Status status) { this.statInfo.firstLogIndex = this.nextIndex; this.statInfo.lastLogIndex = this.nextIndex - 1; this.appendEntriesCounter++; - this.state = State.Probe; + setState(State.Probe); final int stateVersion = this.version; final int seq = getAndIncrementReqSeq(); final Future rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(), @@ -868,7 +934,7 @@ public static void waitForCaughtUp(final ThreadId id, final long maxMargin, fina @Override public String toString() { - return "Replicator [state=" + this.state + ", statInfo=" + this.statInfo + ", peerId=" + return "Replicator [state=" + getState() + ", statInfo=" + this.statInfo + ", peerId=" + this.options.getPeerId() + ", type=" + this.options.getReplicatorType() + "]"; } @@ -1068,7 +1134,7 @@ void destroy() { this.nodeMetrics.getMetricRegistry() // .removeMatching(MetricFilter.startsWith(getReplicatorMetricName(this.options))); } - this.state = State.Destroyed; + setState(State.Destroyed); notifyReplicatorStatusListener((Replicator) savedId.getData(), ReplicatorEvent.DESTROYED); savedId.unlockAndDestroy(); this.id = null; @@ -1114,7 +1180,7 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap .append(status); LOG.debug(sb.toString()); } - r.state = State.Probe; + r.setState(State.Probe); notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status); if (++r.consecutiveErrorTimes % 10 == 0) { LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(), @@ -1194,7 +1260,7 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}", holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs()); r.resetInflights(); - r.state = State.Probe; + r.setState(State.Probe); r.sendEmptyEntries(false); return; } @@ -1247,7 +1313,7 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St "Replicator {} response sequence out of order, expect {}, but it is {}, reset state to try again.", r, inflight.seq, queuedPipelinedResponse.seq); r.resetInflights(); - r.state = State.Probe; + r.setState(State.Probe); continueSendEntries = false; r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber()); return; @@ -1309,7 +1375,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight "Replicator {} received invalid AppendEntriesResponse, in-flight startIndex={}, request prevLogIndex={}, reset the replicator state and probe again.", r, inflight.startIndex, request.getPrevLogIndex()); r.resetInflights(); - r.state = State.Probe; + r.setState(State.Probe); // unlock id in sendEmptyEntries r.sendEmptyEntries(false); return false; @@ -1354,7 +1420,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight r.consecutiveErrorTimes, status); } r.resetInflights(); - r.state = State.Probe; + r.setState(State.Probe); // unlock in in block r.block(startTimeMs, status.getCode()); return false; @@ -1413,7 +1479,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight // success if (response.getTerm() != r.options.getTerm()) { r.resetInflights(); - r.state = State.Probe; + r.setState(State.Probe); LOG.error("Fail, response term {} dismatch, expect term {}", response.getTerm(), r.options.getTerm()); id.unlock(); return false; @@ -1433,7 +1499,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight } } - r.state = State.Replicate; + r.setState(State.Replicate); r.blockTimer = null; r.nextIndex += entriesSize; r.hasSucceeded = true; diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java index b1fc1edd6..9d3e5b3aa 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java @@ -524,6 +524,11 @@ public void onCreated(final PeerId peer) { NodeTest.this.startedCounter.incrementAndGet(); } + @Override + public void stateChanged(final PeerId peer, final ReplicatorState newState) { + LOG.info("Replicator {} state is changed into {}.", peer, newState); + } + @Override public void onError(final PeerId peer, final Status status) { LOG.info("Replicator has errors");