Skip to content

Commit

Permalink
Merge branch 'master' into ccr
Browse files Browse the repository at this point in the history
* master:
  Fix resync request serialization
  Fix issue where pages aren't released (#27459)
  Add YAML REST tests for filters bucket agg (#27128)
  Remove tcp profile from low level nio channel (#27441)
  • Loading branch information
jasontedor committed Nov 21, 2017
2 parents 58591f2 + 28660be commit f2eaacb
Show file tree
Hide file tree
Showing 25 changed files with 515 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,68 @@
*/
package org.elasticsearch.action.resync;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;

import java.io.IOException;
import java.util.List;
import java.util.Arrays;

/**
* Represents a batch of operations sent from the primary to its replicas during the primary-replica resync.
*/
public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {

private List<Translog.Operation> operations;
private Translog.Operation[] operations;

ResyncReplicationRequest() {
super();
}

public ResyncReplicationRequest(ShardId shardId, List<Translog.Operation> operations) {
public ResyncReplicationRequest(final ShardId shardId, final Translog.Operation[] operations) {
super(shardId);
this.operations = operations;
}

public List<Translog.Operation> getOperations() {
public Translog.Operation[] getOperations() {
return operations;
}

@Override
public void readFrom(StreamInput in) throws IOException {
public void readFrom(final StreamInput in) throws IOException {
if (in.getVersion().equals(Version.V_6_0_0)) {
/*
* Resync replication request serialization was broken in 6.0.0 due to the elements of the stream not being prefixed with a
* byte indicating the type of the operation.
*/
// TODO: remove this check in 8.0.0 which provides no BWC guarantees with 6.x.
assert Version.CURRENT.major <= 7;
throw new IllegalStateException("resync replication request serialization is broken in 6.0.0");
}
super.readFrom(in);
operations = in.readList(Translog.Operation::readType);
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(operations);
out.writeArray(Translog.Operation::writeOperation, operations);
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final ResyncReplicationRequest that = (ResyncReplicationRequest) o;
return Arrays.equals(operations, that.operations);
}

@Override
public int hashCode() {
return Arrays.hashCode(operations);
}

@Override
Expand All @@ -62,7 +88,8 @@ public String toString() {
"shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
", ops=" + operations.size() +
", ops=" + operations.length +
"}";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -688,9 +688,21 @@ public byte[] readByteArray() throws IOException {
return bytes;
}

public <T> T[] readArray(Writeable.Reader<T> reader, IntFunction<T[]> arraySupplier) throws IOException {
int length = readArraySize();
T[] values = arraySupplier.apply(length);
/**
* Reads an array from the stream using the specified {@link org.elasticsearch.common.io.stream.Writeable.Reader} to read array elements
* from the stream. This method can be seen as the reader version of {@link StreamOutput#writeArray(Writeable.Writer, Object[])}. It is
* assumed that the stream first contains a variable-length integer representing the size of the array, and then contains that many
* elements that can be read from the stream.
*
* @param reader the reader used to read individual elements
* @param arraySupplier a supplier used to construct a new array
* @param <T> the type of the elements of the array
* @return an array read from the stream
* @throws IOException if an I/O exception occurs while reading the array
*/
public <T> T[] readArray(final Writeable.Reader<T> reader, final IntFunction<T[]> arraySupplier) throws IOException {
final int length = readArraySize();
final T[] values = arraySupplier.apply(length);
for (int i = 0; i < length; i++) {
values[i] = reader.read(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.IntFunction;

/**
* A stream from another node to this node. Technically, it can also be streamed from a byte array but that is mostly for testing.
Expand Down Expand Up @@ -706,6 +707,23 @@ public void writeDoubleArray(double[] values) throws IOException {
}
}

/**
* Writes the specified array to the stream using the specified {@link Writer} for each element in the array. This method can be seen as
* writer version of {@link StreamInput#readArray(Writeable.Reader, IntFunction)}. The length of array encoded as a variable-length
* integer is first written to the stream, and then the elements of the array are written to the stream.
*
* @param writer the writer used to write individual elements
* @param array the array
* @param <T> the type of the elements of the array
* @throws IOException if an I/O exception occurs while writing the array
*/
public <T> void writeArray(final Writer<T> writer, final T[] array) throws IOException {
writeVInt(array.length);
for (T value : array) {
writer.write(this, value);
}
}

public <T extends Writeable> void writeArray(T[] array) throws IOException {
writeVInt(array.length);
for (T value: array) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ public void onFailure(Exception e) {
}
}

private static Translog.Operation[] EMPTY_ARRAY = new Translog.Operation[0];

@Override
protected void doRun() throws Exception {
long size = 0;
Expand Down Expand Up @@ -247,7 +249,7 @@ protected void doRun() throws Exception {

if (!operations.isEmpty()) {
task.setPhase("sending_ops");
ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations);
ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations.toArray(EMPTY_ARRAY));
logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
syncAction.sync(request, task, primaryAllocationId, primaryTerm, this);
Expand Down
55 changes: 32 additions & 23 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.uid.Versions;
Expand Down Expand Up @@ -861,7 +860,7 @@ public interface Snapshot extends Closeable {
* A generic interface representing an operation performed on the transaction log.
* Each is associated with a type.
*/
public interface Operation extends Writeable {
public interface Operation {
enum Type {
@Deprecated
CREATE((byte) 1),
Expand Down Expand Up @@ -890,7 +889,7 @@ public static Type fromId(byte id) {
case 4:
return NO_OP;
default:
throw new IllegalArgumentException("No type mapped for [" + id + "]");
throw new IllegalArgumentException("no type mapped for [" + id + "]");
}
}
}
Expand All @@ -907,31 +906,44 @@ public static Type fromId(byte id) {

/**
* Reads the type and the operation from the given stream. The operation must be written with
* {@link Operation#writeType(Operation, StreamOutput)}
* {@link Operation#writeOperation(StreamOutput, Operation)}
*/
static Operation readType(StreamInput input) throws IOException {
Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
static Operation readOperation(final StreamInput input) throws IOException {
final Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
switch (type) {
case CREATE:
// the deserialization logic in Index was identical to that of Create when create was deprecated
// the de-serialization logic in Index was identical to that of Create when create was deprecated
case INDEX:
return new Index(input);
case DELETE:
return new Delete(input);
case INDEX:
return new Index(input);
case NO_OP:
return new NoOp(input);
default:
throw new IOException("No type for [" + type + "]");
throw new AssertionError("no case for [" + type + "]");
}
}

/**
* Writes the type and translog operation to the given stream
*/
static void writeType(Translog.Operation operation, StreamOutput output) throws IOException {
static void writeOperation(final StreamOutput output, final Operation operation) throws IOException {
output.writeByte(operation.opType().id());
operation.writeTo(output);
switch(operation.opType()) {
case CREATE:
// the serialization logic in Index was identical to that of Create when create was deprecated
case INDEX:
((Index) operation).write(output);
break;
case DELETE:
((Delete) operation).write(output);
break;
case NO_OP:
((NoOp) operation).write(output);
break;
default:
throw new AssertionError("no case for [" + operation.opType() + "]");
}
}

}
Expand Down Expand Up @@ -968,7 +980,7 @@ public static class Index implements Operation {
private final String routing;
private final String parent;

public Index(StreamInput in) throws IOException {
private Index(final StreamInput in) throws IOException {
final int format = in.readVInt(); // SERIALIZATION_FORMAT
assert format >= FORMAT_2_X : "format was: " + format;
id = in.readString();
Expand Down Expand Up @@ -1081,8 +1093,7 @@ public Source getSource() {
return new Source(source, routing, parent);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
private void write(final StreamOutput out) throws IOException {
out.writeVInt(SERIALIZATION_FORMAT);
out.writeString(id);
out.writeString(type);
Expand Down Expand Up @@ -1170,7 +1181,7 @@ public static class Delete implements Operation {
private final long version;
private final VersionType versionType;

public Delete(StreamInput in) throws IOException {
private Delete(final StreamInput in) throws IOException {
final int format = in.readVInt();// SERIALIZATION_FORMAT
assert format >= FORMAT_5_0 : "format was: " + format;
if (format >= FORMAT_SINGLE_TYPE) {
Expand Down Expand Up @@ -1265,8 +1276,7 @@ public Source getSource() {
throw new IllegalStateException("trying to read doc source from delete operation");
}

@Override
public void writeTo(StreamOutput out) throws IOException {
private void write(final StreamOutput out) throws IOException {
out.writeVInt(SERIALIZATION_FORMAT);
out.writeString(type);
out.writeString(id);
Expand Down Expand Up @@ -1336,7 +1346,7 @@ public String reason() {
return reason;
}

NoOp(final StreamInput in) throws IOException {
private NoOp(final StreamInput in) throws IOException {
seqNo = in.readLong();
primaryTerm = in.readLong();
reason = in.readString();
Expand All @@ -1351,8 +1361,7 @@ public NoOp(final long seqNo, final long primaryTerm, final String reason) {
this.reason = reason;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
private void write(final StreamOutput out) throws IOException {
out.writeLong(seqNo);
out.writeLong(primaryTerm);
out.writeString(reason);
Expand Down Expand Up @@ -1454,7 +1463,7 @@ static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws I
verifyChecksum(in);
in.reset();
}
operation = Translog.Operation.readType(in);
operation = Translog.Operation.readOperation(in);
verifyChecksum(in);
} catch (TranslogCorruptedException e) {
throw e;
Expand Down Expand Up @@ -1497,7 +1506,7 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
// because closing it closes the underlying stream, which we don't
// want to do here.
out.resetDigest();
Translog.Operation.writeType(op, out);
Translog.Operation.writeOperation(out, op);
long checksum = out.getChecksum();
out.writeInt((int) checksum);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.resync;

import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.nio.charset.Charset;

import static org.hamcrest.Matchers.equalTo;

public class ResyncReplicationRequestTests extends ESTestCase {

public void testSerialization() throws IOException {
final byte[] bytes = "{}".getBytes(Charset.forName("UTF-8"));
final Translog.Index index = new Translog.Index("type", "id", 0, Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, null, -1);
final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, new Translog.Operation[]{index});

final BytesStreamOutput out = new BytesStreamOutput();
before.writeTo(out);

final StreamInput in = out.bytes().streamInput();
final ResyncReplicationRequest after = new ResyncReplicationRequest();
after.readFrom(in);

assertThat(after, equalTo(before));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
AtomicBoolean syncActionCalled = new AtomicBoolean();
PrimaryReplicaSyncer.SyncAction syncAction =
(request, parentTask, allocationId, primaryTerm, listener) -> {
logger.info("Sending off {} operations", request.getOperations().size());
logger.info("Sending off {} operations", request.getOperations().length);
syncActionCalled.set(true);
assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class));
listener.onResponse(new ResyncReplicationResponse());
Expand Down Expand Up @@ -98,7 +98,7 @@ public void testSyncerOnClosingShard() throws Exception {
CountDownLatch syncCalledLatch = new CountDownLatch(1);
PrimaryReplicaSyncer.SyncAction syncAction =
(request, parentTask, allocationId, primaryTerm, listener) -> {
logger.info("Sending off {} operations", request.getOperations().size());
logger.info("Sending off {} operations", request.getOperations().length);
syncActionCalled.set(true);
syncCalledLatch.countDown();
threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse()));
Expand Down
Loading

0 comments on commit f2eaacb

Please sign in to comment.