Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Added write buffer size limit #34797

Merged
merged 12 commits into from
Oct 24, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ private void followLeaderIndex(String autoFollowPattenName,
followRequest.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches());
followRequest.setMaxBatchSize(pattern.getMaxBatchSize());
followRequest.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches());
followRequest.setMaxWriteBufferCount(pattern.getMaxWriteBufferCount());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We forget passing "maxWriteBufferSize" parameter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed: 7af8ba2

followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay());
followRequest.setPollTimeout(pattern.getPollTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private long numberOfOperationsIndexed = 0;
private long lastFetchTime = -1;
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
private long bufferSizeInBytes = 0;
private final LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>> fetchExceptions;

private volatile ElasticsearchException fatalException;
Expand Down Expand Up @@ -183,8 +184,12 @@ private boolean hasReadBudget() {
params.getFollowShardId(), numConcurrentReads);
return false;
}
if (buffer.size() > params.getMaxWriteBufferSize()) {
LOGGER.trace("{} no new reads, buffer limit has been reached [{}]", params.getFollowShardId(), buffer.size());
if (bufferSizeInBytes >= params.getMaxWriteBufferSize().getBytes()) {
LOGGER.trace("{} no new reads, buffer size limit has been reached [{}]", params.getFollowShardId(), bufferSizeInBytes);
return false;
}
if (buffer.size() > params.getMaxWriteBufferCount()) {
LOGGER.trace("{} no new reads, buffer count limit has been reached [{}]", params.getFollowShardId(), buffer.size());
return false;
}
return true;
Expand All @@ -208,6 +213,7 @@ private synchronized void coordinateWrites() {
break;
}
}
bufferSizeInBytes -= sumEstimatedSize;
numConcurrentWrites++;
LOGGER.trace("{}[{}] write [{}/{}] [{}]", params.getFollowShardId(), numConcurrentWrites, ops.get(0).seqNo(),
ops.get(ops.size() - 1).seqNo(), ops.size());
Expand Down Expand Up @@ -281,7 +287,12 @@ synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, Shar
} else {
assert response.getOperations()[0].seqNo() == from :
"first operation is not what we asked for. From is [" + from + "], got " + response.getOperations()[0];
buffer.addAll(Arrays.asList(response.getOperations()));
List<Translog.Operation> operations = Arrays.asList(response.getOperations());
long operationsSize = operations.stream()
.mapToLong(Translog.Operation::estimateSize)
.sum();
buffer.addAll(operations);
bufferSizeInBytes += operationsSize;
final long maxSeqNo = response.getOperations()[response.getOperations().length - 1].seqNo();
assert maxSeqNo ==
Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::seqNo).max().getAsLong();
Expand Down Expand Up @@ -455,6 +466,7 @@ public synchronized ShardFollowNodeTaskStatus getStatus() {
numConcurrentReads,
numConcurrentWrites,
buffer.size(),
bufferSizeInBytes,
currentMappingVersion,
totalFetchTimeMillis,
totalFetchTookTimeMillis,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
public static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches");
public static final ParseField MAX_BATCH_SIZE = new ParseField("max_batch_size");
public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
public static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count");
public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout");
Expand All @@ -56,7 +57,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
private static ConstructingObjectParser<ShardFollowTask, Void> PARSER = new ConstructingObjectParser<>(NAME,
(a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]),
new ShardId((String) a[4], (String) a[5], (int) a[6]), (int) a[7], (int) a[8], (ByteSizeValue) a[9],
(int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (Map<String, String>) a[14]));
(int) a[10], (int) a[11], (ByteSizeValue) a[12], (TimeValue) a[13], (TimeValue) a[14], (Map<String, String>) a[15]));

static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REMOTE_CLUSTER_FIELD);
Expand All @@ -74,7 +75,12 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
MAX_BATCH_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_SIZE);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_COUNT);
PARSER.declareField(
ConstructingObjectParser.constructorArg(),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()),
MAX_WRITE_BUFFER_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()),
MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
Expand All @@ -91,7 +97,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
private final int maxConcurrentReadBatches;
private final ByteSizeValue maxBatchSize;
private final int maxConcurrentWriteBatches;
private final int maxWriteBufferSize;
private final int maxWriteBufferCount;
private final ByteSizeValue maxWriteBufferSize;
private final TimeValue maxRetryDelay;
private final TimeValue pollTimeout;
private final Map<String, String> headers;
Expand All @@ -104,7 +111,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
final int maxConcurrentReadBatches,
final ByteSizeValue maxBatchSize,
final int maxConcurrentWriteBatches,
final int maxWriteBufferSize,
final int maxWriteBufferCount,
final ByteSizeValue maxWriteBufferSize,
final TimeValue maxRetryDelay,
final TimeValue pollTimeout,
final Map<String, String> headers) {
Expand All @@ -115,6 +123,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
this.maxConcurrentReadBatches = maxConcurrentReadBatches;
this.maxBatchSize = maxBatchSize;
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
this.maxWriteBufferCount = maxWriteBufferCount;
this.maxWriteBufferSize = maxWriteBufferSize;
this.maxRetryDelay = maxRetryDelay;
this.pollTimeout = pollTimeout;
Expand All @@ -129,7 +138,8 @@ public ShardFollowTask(StreamInput in) throws IOException {
this.maxConcurrentReadBatches = in.readVInt();
this.maxBatchSize = new ByteSizeValue(in);
this.maxConcurrentWriteBatches = in.readVInt();
this.maxWriteBufferSize = in.readVInt();
this.maxWriteBufferCount = in.readVInt();
this.maxWriteBufferSize = new ByteSizeValue(in);
this.maxRetryDelay = in.readTimeValue();
this.pollTimeout = in.readTimeValue();
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
Expand Down Expand Up @@ -159,7 +169,11 @@ public int getMaxConcurrentWriteBatches() {
return maxConcurrentWriteBatches;
}

public int getMaxWriteBufferSize() {
public int getMaxWriteBufferCount() {
return maxWriteBufferCount;
}

public ByteSizeValue getMaxWriteBufferSize() {
return maxWriteBufferSize;
}

Expand Down Expand Up @@ -197,7 +211,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(maxConcurrentReadBatches);
maxBatchSize.writeTo(out);
out.writeVInt(maxConcurrentWriteBatches);
out.writeVInt(maxWriteBufferSize);
out.writeVInt(maxWriteBufferCount);
maxWriteBufferSize.writeTo(out);
out.writeTimeValue(maxRetryDelay);
out.writeTimeValue(pollTimeout);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
Expand All @@ -221,7 +236,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
builder.field(MAX_BATCH_SIZE.getPreferredName(), maxBatchSize.getStringRep());
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount);
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep());
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep());
builder.field(HEADERS.getPreferredName(), headers);
Expand All @@ -240,7 +256,8 @@ public boolean equals(Object o) {
maxConcurrentReadBatches == that.maxConcurrentReadBatches &&
maxConcurrentWriteBatches == that.maxConcurrentWriteBatches &&
maxBatchSize.equals(that.maxBatchSize) &&
maxWriteBufferSize == that.maxWriteBufferSize &&
maxWriteBufferCount == that.maxWriteBufferCount &&
maxWriteBufferSize.equals(that.maxWriteBufferSize) &&
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
Objects.equals(pollTimeout, that.pollTimeout) &&
Objects.equals(headers, that.headers);
Expand All @@ -256,6 +273,7 @@ public int hashCode() {
maxConcurrentReadBatches,
maxConcurrentWriteBatches,
maxBatchSize,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
pollTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
request.getMaxConcurrentReadBatches(),
request.getMaxBatchSize(),
request.getMaxConcurrentWriteBatches(),
request.getMaxWriteBufferCount(),
request.getMaxWriteBufferSize(),
request.getMaxRetryDelay(),
request.getPollTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
static final ByteSizeValue DEFAULT_MAX_BATCH_SIZE = new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES);
private static final TimeValue DEFAULT_MAX_RETRY_DELAY = new TimeValue(500);
private static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 9;
private static final int DEFAULT_MAX_WRITE_BUFFER_SIZE = 10240;
private static final int DEFAULT_MAX_WRITE_BUFFER_COUNT = Integer.MAX_VALUE;
private static final ByteSizeValue DEFAULT_MAX_WRITE_BUFFER_SIZE = new ByteSizeValue(512, ByteSizeUnit.MB);
private static final int DEFAULT_MAX_BATCH_OPERATION_COUNT = 5120;
private static final int DEFAULT_MAX_CONCURRENT_READ_BATCHES = 12;
static final TimeValue DEFAULT_POLL_TIMEOUT = TimeValue.timeValueMinutes(1);
Expand Down Expand Up @@ -259,7 +260,14 @@ private static ShardFollowTask createShardFollowTask(
maxConcurrentWriteBatches = DEFAULT_MAX_CONCURRENT_WRITE_BATCHES;
}

int maxWriteBufferSize;
int maxWriteBufferCount;
if (request.getMaxWriteBufferCount() != null) {
maxWriteBufferCount = request.getMaxWriteBufferCount();
} else {
maxWriteBufferCount = DEFAULT_MAX_WRITE_BUFFER_COUNT;
}

ByteSizeValue maxWriteBufferSize;
if (request.getMaxWriteBufferSize() != null) {
maxWriteBufferSize = request.getMaxWriteBufferSize();
} else {
Expand All @@ -277,6 +285,7 @@ private static ShardFollowTask createShardFollowTask(
maxConcurrentReadBatches,
maxBatchSize,
maxConcurrentWriteBatches,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
pollTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -65,6 +67,7 @@
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public abstract class CcrIntegTestCase extends ESTestCase {
Expand Down Expand Up @@ -103,6 +106,7 @@ public final void startClusters() throws Exception {

@After
public void afterTest() throws Exception {
ensureEmptyWriteBuffers();
String masterNode = clusterGroup.followerCluster.getMasterName();
ClusterService clusterService = clusterGroup.followerCluster.getInstance(ClusterService.class, masterNode);
removeCCRRelatedMetadataFromClusterState(clusterService);
Expand Down Expand Up @@ -263,6 +267,18 @@ protected final RefreshResponse refresh(Client client, String... indices) {
return actionGet;
}

protected void ensureEmptyWriteBuffers() throws Exception {
assertBusy(() -> {
FollowStatsAction.StatsResponses statsResponses =
leaderClient().execute(FollowStatsAction.INSTANCE, new FollowStatsAction.StatsRequest()).actionGet();
for (FollowStatsAction.StatsResponse statsResponse : statsResponses.getStatsResponses()) {
ShardFollowNodeTaskStatus status = statsResponse.status();
assertThat(status.numberOfQueuedWrites(), equalTo(0));
assertThat(status.bufferSize(), equalTo(0L));
}
});
}

static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.junit.After;
Expand All @@ -26,6 +28,7 @@

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.CcrIntegTestCase.removeCCRRelatedMetadataFromClusterState;
import static org.hamcrest.Matchers.equalTo;

public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase {

Expand Down Expand Up @@ -80,4 +83,16 @@ protected PutFollowAction.Request getPutFollowRequest() {
return request;
}

protected void ensureEmptyWriteBuffers() throws Exception {
assertBusy(() -> {
FollowStatsAction.StatsResponses statsResponses =
client().execute(FollowStatsAction.INSTANCE, new FollowStatsAction.StatsRequest()).actionGet();
for (FollowStatsAction.StatsResponse statsResponse : statsResponses.getStatsResponses()) {
ShardFollowNodeTaskStatus status = statsResponse.status();
assertThat(status.numberOfQueuedWrites(), equalTo(0));
assertThat(status.bufferSize(), equalTo(0L));
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testAutoFollowParameterAreDelegated() throws Exception {
// Need to set this, because following an index in the same cluster
request.setFollowIndexNamePattern("copy-{{leader_index}}");
if (randomBoolean()) {
request.setMaxWriteBufferSize(randomIntBetween(0, Integer.MAX_VALUE));
request.setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxConcurrentReadBatches(randomIntBetween(0, Integer.MAX_VALUE));
Expand All @@ -137,6 +137,9 @@ public void testAutoFollowParameterAreDelegated() throws Exception {
if (randomBoolean()) {
request.setMaxBatchOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong()));
}
if (randomBoolean()) {
request.setMaxBatchSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES));
}
Expand All @@ -157,8 +160,11 @@ public void testAutoFollowParameterAreDelegated() throws Exception {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTasksMetaData.tasks().iterator().next().getParams();
assertThat(shardFollowTask.getLeaderShardId().getIndexName(), equalTo("logs-201901"));
assertThat(shardFollowTask.getFollowShardId().getIndexName(), equalTo("copy-logs-201901"));
if (request.getMaxWriteBufferCount() != null) {
assertThat(shardFollowTask.getMaxWriteBufferCount(), equalTo(request.getMaxWriteBufferCount()));
}
if (request.getMaxWriteBufferSize() != null) {
assertThat(shardFollowTask.getMaxWriteBufferSize(), equalTo(request.getMaxWriteBufferSize()));
assertThat(shardFollowTask.getMaxConcurrentWriteBatches(), equalTo(request.getMaxConcurrentWriteBatches()));
}
if (request.getMaxConcurrentReadBatches() != null) {
assertThat(shardFollowTask.getMaxConcurrentReadBatches(), equalTo(request.getMaxConcurrentReadBatches()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ protected AutoFollowMetadata createTestInstance() {
new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong()),
TimeValue.timeValueMillis(500),
TimeValue.timeValueMillis(500));
configs.put(Integer.toString(i), autoFollowPattern);
Expand Down
Loading