Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat) Adds new method stateChanged(peerId, newState) for ReplicatorS… #558

Merged
merged 2 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 87 additions & 21 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.CatchUpClosure;
import com.alipay.sofa.jraft.core.Replicator.ReplicatorStateListener.ReplicatorState;
import com.alipay.sofa.jraft.entity.EnumOutter;
import com.alipay.sofa.jraft.entity.LogEntry;
import com.alipay.sofa.jraft.entity.PeerId;
Expand Down Expand Up @@ -145,11 +146,12 @@ private int getAndIncrementRequiredNextSeq() {
}

/**
* Replicator state
* Replicator internal state
* @author dennis
*
*/
public enum State {
Created,
Probe, // probe follower state
Snapshot, // installing snapshot to follower
Replicate, // replicate logs normally
Expand All @@ -164,6 +166,7 @@ public Replicator(final ReplicatorOptions replicatorOptions, final RaftOptions r
this.timerManager = replicatorOptions.getTimerManager();
this.raftOptions = raftOptions;
this.rpcService = replicatorOptions.getRaftRpcService();
setState(State.Created);
}

/**
Expand Down Expand Up @@ -209,7 +212,8 @@ enum RunningState {
enum ReplicatorEvent {
CREATED, // created
ERROR, // error
DESTROYED // destroyed
DESTROYED ,// destroyed
STATE_CHANGED; // state changed.
}

/**
Expand All @@ -222,6 +226,31 @@ enum ReplicatorEvent {
*/
public interface ReplicatorStateListener {

/**
* Represents state changes in the replicator.
* @author boyan(boyan@antfin.com)
*
*/
enum ReplicatorState{
/**
* The replicator is created.
*/
CREATED,
/**
* The replicator is destroyed.
*/
DESTROYED,
/**
* The replicator begins to doing it's job(replicating logs or installing snapshot).
*/
ONLINE,
/**
* The replicaotr is suspended by raft error or lost connection.
*/
OFFLINE
}


/**
* Called when this replicator has been created.
*
Expand All @@ -243,6 +272,19 @@ public interface ReplicatorStateListener {
* @param peer replicator related peerId
*/
void onDestroyed(final PeerId peer);

/**
* Called when the replicator state is changed. See {@link ReplicatorState}
* @param peer the replicator's peer id.
* @param newState the new replicator state.
* @since 1.3.6
*/
default void stateChanged(final PeerId peer, final ReplicatorState newState) {}
}

private static void notifyReplicatorStatusListener(final Replicator replicator, final ReplicatorEvent event,
final Status status) {
notifyReplicatorStatusListener(replicator, event, status, null);
}

/**
Expand All @@ -253,7 +295,7 @@ public interface ReplicatorStateListener {
* @param status replicator's error detailed status
*/
private static void notifyReplicatorStatusListener(final Replicator replicator, final ReplicatorEvent event,
final Status status) {
final Status status, final ReplicatorState newState) {
final ReplicatorOptions replicatorOpts = Requires.requireNonNull(replicator.getOpts(), "replicatorOptions");
final Node node = Requires.requireNonNull(replicatorOpts.getNode(), "node");
final PeerId peer = Requires.requireNonNull(replicatorOpts.getPeerId(), "peer");
Expand All @@ -265,14 +307,16 @@ private static void notifyReplicatorStatusListener(final Replicator replicator,
try {
switch (event) {
case CREATED:
RpcUtils.runInThread(() -> listener.onCreated(peer));
RpcUtils.runInThread(() -> listener.onCreated(peer));
break;
case ERROR:
RpcUtils.runInThread(() -> listener.onError(peer, status));
break;
case DESTROYED:
RpcUtils.runInThread(() -> listener.onDestroyed(peer));
break;
case STATE_CHANGED:
RpcUtils.runInThread(() -> listener.stateChanged(peer, newState));
default:
break;
}
Expand Down Expand Up @@ -406,14 +450,36 @@ ArrayDeque<Inflight> getInflights() {
return this.inflights;
}

@OnlyForTest
State getState() {
return this.state;
}

@OnlyForTest
void setState(final State state) {
State oldState = this.state;
this.state = state;

if(oldState != state) {
ReplicatorState newState = null;
switch(state) {
case Created:
newState = ReplicatorState.CREATED;
break;
case Replicate:
case Snapshot:
newState = ReplicatorState.ONLINE;
break;
case Probe:
newState = ReplicatorState.OFFLINE;
break;
case Destroyed:
newState = ReplicatorState.DESTROYED;
break;
}

if(newState != null) {
notifyReplicatorStatusListener(this, ReplicatorEvent.STATE_CHANGED, null, newState);
}
}
}

@OnlyForTest
Expand Down Expand Up @@ -539,7 +605,7 @@ private void startHeartbeatTimer(final long startMs) {
}

void installSnapshot() {
if (this.state == State.Snapshot) {
if (getState() == State.Snapshot) {
LOG.warn("Replicator {} is installing snapshot, ignore the new request.", this.options.getPeerId());
this.id.unlock();
return;
Expand All @@ -548,7 +614,7 @@ void installSnapshot() {
try {
Requires.requireTrue(this.reader == null,
"Replicator %s already has a snapshot reader, current state is %s", this.options.getPeerId(),
this.state);
getState());
this.reader = this.options.getSnapshotStorage().open();
if (this.reader == null) {
final NodeImpl node = this.options.getNode();
Expand Down Expand Up @@ -595,7 +661,7 @@ void installSnapshot() {
this.statInfo.lastTermIncluded = meta.getLastIncludedTerm();

final InstallSnapshotRequest request = rb.build();
this.state = State.Snapshot;
setState(State.Snapshot);
// noinspection NonAtomicOperationOnVolatileField
this.installSnapshotCounter++;
final long monotonicSendTimeMs = Utils.monotonicMs();
Expand Down Expand Up @@ -657,7 +723,7 @@ static boolean onInstallSnapshotReturned(final ThreadId id, final Replicator r,
if (!success) {
//should reset states
r.resetInflights();
r.state = State.Probe;
r.setState(State.Probe);
r.block(Utils.nowMs(), status.getCode());
return false;
}
Expand All @@ -667,7 +733,7 @@ static boolean onInstallSnapshotReturned(final ThreadId id, final Replicator r,
r.sendTimeoutNow(false, false);
}
// id is unlock in _send_entriesheartbeatCounter
r.state = State.Replicate;
r.setState(State.Replicate);
return true;
}

Expand Down Expand Up @@ -725,7 +791,7 @@ public void run(final Status status) {
this.statInfo.firstLogIndex = this.nextIndex;
this.statInfo.lastLogIndex = this.nextIndex - 1;
this.appendEntriesCounter++;
this.state = State.Probe;
setState(State.Probe);
final int stateVersion = this.version;
final int seq = getAndIncrementReqSeq();
final Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(),
Expand Down Expand Up @@ -868,7 +934,7 @@ public static void waitForCaughtUp(final ThreadId id, final long maxMargin, fina

@Override
public String toString() {
return "Replicator [state=" + this.state + ", statInfo=" + this.statInfo + ", peerId="
return "Replicator [state=" + getState() + ", statInfo=" + this.statInfo + ", peerId="
+ this.options.getPeerId() + ", type=" + this.options.getReplicatorType() + "]";
}

Expand Down Expand Up @@ -1068,7 +1134,7 @@ void destroy() {
this.nodeMetrics.getMetricRegistry() //
.removeMatching(MetricFilter.startsWith(getReplicatorMetricName(this.options)));
}
this.state = State.Destroyed;
setState(State.Destroyed);
notifyReplicatorStatusListener((Replicator) savedId.getData(), ReplicatorEvent.DESTROYED);
savedId.unlockAndDestroy();
this.id = null;
Expand Down Expand Up @@ -1114,7 +1180,7 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap
.append(status);
LOG.debug(sb.toString());
}
r.state = State.Probe;
r.setState(State.Probe);
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10 == 0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
Expand Down Expand Up @@ -1194,7 +1260,7 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St
LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}",
holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs());
r.resetInflights();
r.state = State.Probe;
r.setState(State.Probe);
r.sendEmptyEntries(false);
return;
}
Expand Down Expand Up @@ -1247,7 +1313,7 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St
"Replicator {} response sequence out of order, expect {}, but it is {}, reset state to try again.",
r, inflight.seq, queuedPipelinedResponse.seq);
r.resetInflights();
r.state = State.Probe;
r.setState(State.Probe);
continueSendEntries = false;
r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber());
return;
Expand Down Expand Up @@ -1309,7 +1375,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
"Replicator {} received invalid AppendEntriesResponse, in-flight startIndex={}, request prevLogIndex={}, reset the replicator state and probe again.",
r, inflight.startIndex, request.getPrevLogIndex());
r.resetInflights();
r.state = State.Probe;
r.setState(State.Probe);
// unlock id in sendEmptyEntries
r.sendEmptyEntries(false);
return false;
Expand Down Expand Up @@ -1354,7 +1420,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
r.consecutiveErrorTimes, status);
}
r.resetInflights();
r.state = State.Probe;
r.setState(State.Probe);
// unlock in in block
r.block(startTimeMs, status.getCode());
return false;
Expand Down Expand Up @@ -1413,7 +1479,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
// success
if (response.getTerm() != r.options.getTerm()) {
r.resetInflights();
r.state = State.Probe;
r.setState(State.Probe);
LOG.error("Fail, response term {} dismatch, expect term {}", response.getTerm(), r.options.getTerm());
id.unlock();
return false;
Expand All @@ -1433,7 +1499,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
}
}

r.state = State.Replicate;
r.setState(State.Replicate);
r.blockTimer = null;
r.nextIndex += entriesSize;
r.hasSucceeded = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,11 @@ public void onCreated(final PeerId peer) {
NodeTest.this.startedCounter.incrementAndGet();
}

@Override
public void stateChanged(final PeerId peer, final ReplicatorState newState) {
LOG.info("Replicator {} state is changed into {}.", peer, newState);
}

@Override
public void onError(final PeerId peer, final Status status) {
LOG.info("Replicator has errors");
Expand Down