Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/backpressure #764

Merged
merged 9 commits into from
Feb 25, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.conf.ConfigurationEntry;
import com.alipay.sofa.jraft.entity.EnumOutter;
import com.alipay.sofa.jraft.entity.EnumOutter.ErrorType;
import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.entity.LogEntry;
import com.alipay.sofa.jraft.entity.LogId;
Expand Down Expand Up @@ -232,11 +231,7 @@ private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) {
LOG.warn("FSMCaller is stopped, can not apply new task.");
return false;
}
if (!this.taskQueue.tryPublishEvent(tpl)) {
setError(new RaftException(ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(RaftError.EBUSY,
"FSMCaller is overload.")));
return false;
}
this.taskQueue.publishEvent(tpl);
return true;
}

Expand Down
78 changes: 46 additions & 32 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import com.alipay.sofa.jraft.entity.UserLog;
import com.alipay.sofa.jraft.error.LogIndexOutOfBoundsException;
import com.alipay.sofa.jraft.error.LogNotFoundException;
import com.alipay.sofa.jraft.error.OverloadException;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.option.BallotBoxOptions;
Expand Down Expand Up @@ -115,7 +116,6 @@
import com.alipay.sofa.jraft.util.RpcFactoryHelper;
import com.alipay.sofa.jraft.util.SignalHelper;
import com.alipay.sofa.jraft.util.SystemPropertyUtil;
import com.alipay.sofa.jraft.util.ThreadHelper;
import com.alipay.sofa.jraft.util.ThreadId;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.util.concurrent.LongHeldDetectingReadWriteLock;
Expand Down Expand Up @@ -159,9 +159,6 @@ public class NodeImpl implements Node, RaftServerService {
public final static RaftTimerFactory TIMER_FACTORY = JRaftUtils
.raftTimerFactory();

// Max retry times when applying tasks.
private static final int MAX_APPLY_RETRY_TIMES = 3;

public static final AtomicInteger GLOBAL_NUM_NODES = new AtomicInteger(
0);

Expand Down Expand Up @@ -1611,33 +1608,31 @@ public void apply(final Task task) {

final LogEntry entry = new LogEntry();
entry.setData(task.getData());
int retryTimes = 0;
try {
final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {
event.reset();
event.done = task.getDone();
event.entry = entry;
event.expectedTerm = task.getExpectedTerm();
};
while (true) {
if (this.applyQueue.tryPublishEvent(translator)) {
break;
} else {
retryTimes++;
if (retryTimes > MAX_APPLY_RETRY_TIMES) {
Utils.runClosureInThread(task.getDone(),
new Status(RaftError.EBUSY, "Node is busy, has too many tasks."));
LOG.warn("Node {} applyQueue is overload.", getNodeId());
this.metrics.recordTimes("apply-task-overload-times", 1);
return;
}
ThreadHelper.onSpinWait();
}
}

} catch (final Exception e) {
LOG.error("Fail to apply task.", e);
Utils.runClosureInThread(task.getDone(), new Status(RaftError.EPERM, "Node is down."));
final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {
event.reset();
event.done = task.getDone();
event.entry = entry;
event.expectedTerm = task.getExpectedTerm();
};

switch(this.options.getApplyTaskMode()) {
case Blocking:
this.applyQueue.publishEvent(translator);
break;
case NonBlocking:
default:
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
if (!this.applyQueue.tryPublishEvent(translator)) {
String errorMsg = "Node is busy, has too many tasks, queue is full and bufferSize="+ this.applyQueue.getBufferSize();
Utils.runClosureInThread(task.getDone(),
new Status(RaftError.EBUSY, errorMsg));
LOG.warn("Node {} applyQueue is overload.", getNodeId());
this.metrics.recordTimes("apply-task-overload-times", 1);
if(task.getDone() == null) {
throw new OverloadException(errorMsg);
}
}
break;
}
}

Expand Down Expand Up @@ -1890,6 +1885,7 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi
final long startMs = Utils.monotonicMs();
this.writeLock.lock();
final int entriesCount = request.getEntriesCount();
boolean success = false;
try {
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
Expand Down Expand Up @@ -1975,6 +1971,15 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi
return respBuilder.build();
}

// fast checking if log manager is overloaded
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
if (!this.logManager.hasAvailableCapacityToAppendEntries(1)) {
LOG.warn("Node {} received AppendEntriesRequest but log manager is busy.", getNodeId());
return RpcFactoryHelper //
.responseFactory() //
.newResponse(AppendEntriesResponse.getDefaultInstance(), RaftError.EBUSY,
"Node %s:%s log manager is busy.", this.groupId, this.serverId);
}

// Parse request
long index = prevLogIndex;
final List<LogEntry> entries = new ArrayList<>(entriesCount);
Expand Down Expand Up @@ -2014,13 +2019,22 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi
this.logManager.appendEntries(entries, closure);
// update configuration after _log_manager updated its memory status
checkAndSetConfiguration(true);
success = true;
return null;
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
this.metrics.recordLatency("handle-append-entries", Utils.monotonicMs() - startMs);
this.metrics.recordSize("handle-append-entries-count", entriesCount);
final long processLatency = Utils.monotonicMs() - startMs;
if (entriesCount == 0) {
this.metrics.recordLatency("handle-heartbeat-requests", processLatency);
} else {
this.metrics.recordLatency("handle-append-entries", processLatency);
}
if (success) {
// Don't stats heartbeat requests.
this.metrics.recordSize("handle-append-entries-count", entriesCount);
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.entity.ReadIndexState;
import com.alipay.sofa.jraft.entity.ReadIndexStatus;
import com.alipay.sofa.jraft.error.OverloadException;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.option.RaftOptions;
Expand All @@ -51,7 +52,6 @@
import com.alipay.sofa.jraft.util.LogExceptionHandler;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.ThreadHelper;
import com.alipay.sofa.jraft.util.Utils;
import com.google.protobuf.ZeroByteStringHelper;
import com.lmax.disruptor.BlockingWaitStrategy;
Expand All @@ -69,13 +69,12 @@
*/
public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndexListener {

private static final int MAX_ADD_REQUEST_RETRY_TIMES = 3;
/** Disruptor to run readonly service. */
private Disruptor<ReadIndexEvent> readIndexDisruptor;
private RingBuffer<ReadIndexEvent> readIndexQueue;
private RaftOptions raftOptions;
private NodeImpl node;
private final Lock lock = new ReentrantLock();
private final Lock lock = new ReentrantLock();
private FSMCaller fsmCaller;
private volatile CountDownLatch shutdownLatch;

Expand All @@ -86,10 +85,10 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
private volatile RaftException error;

// <logIndex, statusList>
private final TreeMap<Long, List<ReadIndexStatus>> pendingNotifyStatus = new TreeMap<>();
private final TreeMap<Long, List<ReadIndexStatus>> pendingNotifyStatus = new TreeMap<>();

private static final Logger LOG = LoggerFactory
.getLogger(ReadOnlyServiceImpl.class);
private static final Logger LOG = LoggerFactory
.getLogger(ReadOnlyServiceImpl.class);

private static class ReadIndexEvent {
Bytes requestContext;
Expand Down Expand Up @@ -329,21 +328,24 @@ public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
event.requestContext = new Bytes(reqCtx);
event.startTime = Utils.monotonicMs();
};
int retryTimes = 0;
while (true) {
if (this.readIndexQueue.tryPublishEvent(translator)) {
break;
} else {
retryTimes++;
if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
Utils.runClosureInThread(closure,
new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
this.nodeMetrics.recordTimes("read-index-overload-times", 1);
LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
return;

switch(this.node.getOptions().getApplyTaskMode()) {
case Blocking:
this.readIndexQueue.publishEvent(translator);
break;
case NonBlocking:
default:
if (!this.readIndexQueue.tryPublishEvent(translator)) {
final String errorMsg = "Node is busy, has too many read-index requests, queue is full and bufferSize="+ this.readIndexQueue.getBufferSize();
Utils.runClosureInThread(closure,
new Status(RaftError.EBUSY, errorMsg));
this.nodeMetrics.recordTimes("read-index-overload-times", 1);
LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
if(closure == null) {
throw new OverloadException(errorMsg);
}
ThreadHelper.onSpinWait();
}
}
break;
}
} catch (final Exception e) {
Utils.runClosureInThread(closure, new Status(RaftError.EPERM, "Node is down."));
Expand Down Expand Up @@ -415,7 +417,7 @@ TreeMap<Long, List<ReadIndexStatus>> getPendingNotifyStatus() {

@OnlyForTest
RaftOptions getRaftOptions() {
return raftOptions;
return this.raftOptions;
}

private void reportError(final ReadIndexStatus status, final Status st) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,8 +618,8 @@ void installSnapshot() {
return;
}
boolean doUnlock = true;
if (!rpcService.connect(options.getPeerId().getEndpoint())) {
LOG.error("Fail to check install snapshot connection to peer={}, give up to send install snapshot request.", options.getPeerId().getEndpoint());
if (!this.rpcService.connect(this.options.getPeerId().getEndpoint())) {
LOG.error("Fail to check install snapshot connection to peer={}, give up to send install snapshot request.", this.options.getPeerId().getEndpoint());
block(Utils.nowMs(), RaftError.EHOSTDOWN.getNumber());
return;
}
Expand Down Expand Up @@ -1438,6 +1438,20 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
}
r.consecutiveErrorTimes = 0;
if (!response.getSuccess()) {
// Target node is is busy, sleep for a while.
if(response.getErrorResponse().getErrorCode() == RaftError.EBUSY.getNumber()) {
if (isLogDebugEnabled) {
sb.append(" is busy, sleep, errorMsg='") //
.append(response.getErrorResponse().getErrorMsg()).append("'");
LOG.debug(sb.toString());
}
r.resetInflights();
r.setState(State.Probe);
// unlock in in block
r.block(startTimeMs, status.getCode());
return false;
}

if (response.getTerm() > r.options.getTerm()) {
if (isLogDebugEnabled) {
sb.append(" fail, greater term ") //
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.error;

/**
* Threw when Node is overloaded.
* @author boyan(boyan@antfin.com)
*
*/
public class OverloadException extends JRaftException {

/**
*
*/
private static final long serialVersionUID = -5505054326197103575L;

public OverloadException() {
super();
}

public OverloadException(final String message, final Throwable cause, final boolean enableSuppression,
final boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}

public OverloadException(final String message, final Throwable cause) {
super(message, cause);
}

public OverloadException(final String message) {
super(message);
}

public OverloadException(final Throwable cause) {
super(cause);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.option;

/**
* Apply task in blocking or non-blocking mode.
* @author boyan(boyan@antfin.com)
*
*/
public enum ApplyTaskMode {
Blocking, NonBlocking
}
Loading