Skip to content

Commit

Permalink
(feat) Adds new method stateChanged(peerId, newState) for ReplicatorS…
Browse files Browse the repository at this point in the history
…tateListener, #521
  • Loading branch information
killme2008 committed Mar 16, 2021
1 parent 900da20 commit 8051538
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 21 deletions.
108 changes: 87 additions & 21 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.CatchUpClosure;
import com.alipay.sofa.jraft.core.Replicator.ReplicatorStateListener.ReplicatorState;
import com.alipay.sofa.jraft.entity.EnumOutter;
import com.alipay.sofa.jraft.entity.LogEntry;
import com.alipay.sofa.jraft.entity.PeerId;
Expand Down Expand Up @@ -145,11 +146,12 @@ private int getAndIncrementRequiredNextSeq() {
}

/**
* Replicator state
* Replicator internal state
* @author dennis
*
*/
public enum State {
Created,
Probe, // probe follower state
Snapshot, // installing snapshot to follower
Replicate, // replicate logs normally
Expand All @@ -164,6 +166,7 @@ public Replicator(final ReplicatorOptions replicatorOptions, final RaftOptions r
this.timerManager = replicatorOptions.getTimerManager();
this.raftOptions = raftOptions;
this.rpcService = replicatorOptions.getRaftRpcService();
setState(State.Created);
}

/**
Expand Down Expand Up @@ -209,7 +212,8 @@ enum RunningState {
enum ReplicatorEvent {
CREATED, // created
ERROR, // error
DESTROYED // destroyed
DESTROYED ,// destroyed
STATE_CHANGED; // state changed.
}

/**
Expand All @@ -222,6 +226,31 @@ enum ReplicatorEvent {
*/
public interface ReplicatorStateListener {

/**
* Represents state changes in the replicator.
* @author boyan(boyan@antfin.com)
*
*/
enum ReplicatorState{
/**
* The replicator is created.
*/
CREATED,
/**
* The replicator is destroyed.
*/
DESTROYED,
/**
* The replicator begins to doing it's job(replicating logs or installing snapshot).
*/
ONLINE,
/**
* The replicaotr is suspended by raft error or lost connection.
*/
OFFLINE
}


/**
* Called when this replicator has been created.
*
Expand All @@ -243,6 +272,19 @@ public interface ReplicatorStateListener {
* @param peer replicator related peerId
*/
void onDestroyed(final PeerId peer);

/**
* Called when the replicator state is changed. See {@link ReplicatorState}
* @param peer the replicator's peer id.
* @param newState the new replicator state.
* @since 1.3.6
*/
default void stateChanged(final PeerId peer, final ReplicatorState newState) {}
}

private static void notifyReplicatorStatusListener(final Replicator replicator, final ReplicatorEvent event,
final Status status) {
notifyReplicatorStatusListener(replicator, event, status, null);
}

/**
Expand All @@ -253,7 +295,7 @@ public interface ReplicatorStateListener {
* @param status replicator's error detailed status
*/
private static void notifyReplicatorStatusListener(final Replicator replicator, final ReplicatorEvent event,
final Status status) {
final Status status, final ReplicatorState newState) {
final ReplicatorOptions replicatorOpts = Requires.requireNonNull(replicator.getOpts(), "replicatorOptions");
final Node node = Requires.requireNonNull(replicatorOpts.getNode(), "node");
final PeerId peer = Requires.requireNonNull(replicatorOpts.getPeerId(), "peer");
Expand All @@ -265,14 +307,16 @@ private static void notifyReplicatorStatusListener(final Replicator replicator,
try {
switch (event) {
case CREATED:
RpcUtils.runInThread(() -> listener.onCreated(peer));
RpcUtils.runInThread(() -> listener.onCreated(peer));
break;
case ERROR:
RpcUtils.runInThread(() -> listener.onError(peer, status));
break;
case DESTROYED:
RpcUtils.runInThread(() -> listener.onDestroyed(peer));
break;
case STATE_CHANGED:
RpcUtils.runInThread(() -> listener.stateChanged(peer, newState));
default:
break;
}
Expand Down Expand Up @@ -406,14 +450,36 @@ ArrayDeque<Inflight> getInflights() {
return this.inflights;
}

@OnlyForTest
State getState() {
return this.state;
}

@OnlyForTest
void setState(final State state) {
State oldState = this.state;
this.state = state;

if(oldState != state) {
ReplicatorState newState = null;
switch(state) {
case Created:
newState = ReplicatorState.CREATED;
break;
case Replicate:
case Snapshot:
newState = ReplicatorState.ONLINE;
break;
case Probe:
newState = ReplicatorState.OFFLINE;
break;
case Destroyed:
newState = ReplicatorState.DESTROYED;
break;
}

if(newState != null) {
notifyReplicatorStatusListener(this, ReplicatorEvent.STATE_CHANGED, null, newState);
}
}
}

@OnlyForTest
Expand Down Expand Up @@ -539,7 +605,7 @@ private void startHeartbeatTimer(final long startMs) {
}

void installSnapshot() {
if (this.state == State.Snapshot) {
if (getState() == State.Snapshot) {
LOG.warn("Replicator {} is installing snapshot, ignore the new request.", this.options.getPeerId());
this.id.unlock();
return;
Expand All @@ -548,7 +614,7 @@ void installSnapshot() {
try {
Requires.requireTrue(this.reader == null,
"Replicator %s already has a snapshot reader, current state is %s", this.options.getPeerId(),
this.state);
getState());
this.reader = this.options.getSnapshotStorage().open();
if (this.reader == null) {
final NodeImpl node = this.options.getNode();
Expand Down Expand Up @@ -595,7 +661,7 @@ void installSnapshot() {
this.statInfo.lastTermIncluded = meta.getLastIncludedTerm();

final InstallSnapshotRequest request = rb.build();
this.state = State.Snapshot;
setState(State.Snapshot);
// noinspection NonAtomicOperationOnVolatileField
this.installSnapshotCounter++;
final long monotonicSendTimeMs = Utils.monotonicMs();
Expand Down Expand Up @@ -657,7 +723,7 @@ static boolean onInstallSnapshotReturned(final ThreadId id, final Replicator r,
if (!success) {
//should reset states
r.resetInflights();
r.state = State.Probe;
r.setState(State.Probe);
r.block(Utils.nowMs(), status.getCode());
return false;
}
Expand All @@ -667,7 +733,7 @@ static boolean onInstallSnapshotReturned(final ThreadId id, final Replicator r,
r.sendTimeoutNow(false, false);
}
// id is unlock in _send_entriesheartbeatCounter
r.state = State.Replicate;
r.setState(State.Replicate);
return true;
}

Expand Down Expand Up @@ -725,7 +791,7 @@ public void run(final Status status) {
this.statInfo.firstLogIndex = this.nextIndex;
this.statInfo.lastLogIndex = this.nextIndex - 1;
this.appendEntriesCounter++;
this.state = State.Probe;
setState(State.Probe);
final int stateVersion = this.version;
final int seq = getAndIncrementReqSeq();
final Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(),
Expand Down Expand Up @@ -868,7 +934,7 @@ public static void waitForCaughtUp(final ThreadId id, final long maxMargin, fina

@Override
public String toString() {
return "Replicator [state=" + this.state + ", statInfo=" + this.statInfo + ", peerId="
return "Replicator [state=" + getState() + ", statInfo=" + this.statInfo + ", peerId="
+ this.options.getPeerId() + ", type=" + this.options.getReplicatorType() + "]";
}

Expand Down Expand Up @@ -1068,7 +1134,7 @@ void destroy() {
this.nodeMetrics.getMetricRegistry() //
.removeMatching(MetricFilter.startsWith(getReplicatorMetricName(this.options)));
}
this.state = State.Destroyed;
setState(State.Destroyed);
notifyReplicatorStatusListener((Replicator) savedId.getData(), ReplicatorEvent.DESTROYED);
savedId.unlockAndDestroy();
this.id = null;
Expand Down Expand Up @@ -1114,7 +1180,7 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap
.append(status);
LOG.debug(sb.toString());
}
r.state = State.Probe;
r.setState(State.Probe);
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10 == 0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
Expand Down Expand Up @@ -1194,7 +1260,7 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St
LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}",
holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs());
r.resetInflights();
r.state = State.Probe;
r.setState(State.Probe);
r.sendEmptyEntries(false);
return;
}
Expand Down Expand Up @@ -1247,7 +1313,7 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St
"Replicator {} response sequence out of order, expect {}, but it is {}, reset state to try again.",
r, inflight.seq, queuedPipelinedResponse.seq);
r.resetInflights();
r.state = State.Probe;
r.setState(State.Probe);
continueSendEntries = false;
r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber());
return;
Expand Down Expand Up @@ -1309,7 +1375,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
"Replicator {} received invalid AppendEntriesResponse, in-flight startIndex={}, request prevLogIndex={}, reset the replicator state and probe again.",
r, inflight.startIndex, request.getPrevLogIndex());
r.resetInflights();
r.state = State.Probe;
r.setState(State.Probe);
// unlock id in sendEmptyEntries
r.sendEmptyEntries(false);
return false;
Expand Down Expand Up @@ -1354,7 +1420,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
r.consecutiveErrorTimes, status);
}
r.resetInflights();
r.state = State.Probe;
r.setState(State.Probe);
// unlock in in block
r.block(startTimeMs, status.getCode());
return false;
Expand Down Expand Up @@ -1413,7 +1479,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
// success
if (response.getTerm() != r.options.getTerm()) {
r.resetInflights();
r.state = State.Probe;
r.setState(State.Probe);
LOG.error("Fail, response term {} dismatch, expect term {}", response.getTerm(), r.options.getTerm());
id.unlock();
return false;
Expand All @@ -1433,7 +1499,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
}
}

r.state = State.Replicate;
r.setState(State.Replicate);
r.blockTimer = null;
r.nextIndex += entriesSize;
r.hasSucceeded = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.alipay.sofa.jraft.closure.SynchronizedClosure;
import com.alipay.sofa.jraft.closure.TaskClosure;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.core.Replicator.ReplicatorState;
import com.alipay.sofa.jraft.entity.EnumOutter;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
Expand Down Expand Up @@ -524,6 +525,11 @@ public void onCreated(final PeerId peer) {
NodeTest.this.startedCounter.incrementAndGet();
}

@Override
public void stateChanged(final PeerId peer, final ReplicatorState newState) {
LOG.info("Replicator {} state is changed into {}.", peer, newState);
}

@Override
public void onError(final PeerId peer, final Status status) {
LOG.info("Replicator has errors");
Expand Down

0 comments on commit 8051538

Please sign in to comment.