Skip to content

Commit

Permalink
applied small feedback points
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnvg committed May 7, 2018
1 parent 22b0e44 commit 5ea5c4d
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -204,10 +203,7 @@ public boolean equals(final Object o) {

@Override
public int hashCode() {
int result = 1;
result += Objects.hashCode(indexMetadataVersion);
result += Arrays.hashCode(operations);
return result;
return Objects.hash(indexMetadataVersion, operations);
}
}

Expand Down Expand Up @@ -240,8 +236,10 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
IndexShard indexShard = indexService.getShard(request.getShard().id());

IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
return getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes, indexMetaData);
final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion();
final Translog.Operation[] operations =
getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
return new Response(indexMetaDataVersion, operations);
}

@Override
Expand All @@ -266,8 +264,7 @@ protected Response newResponse() {

private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];

static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit,
IndexMetaData indexMetaData) throws IOException {
static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit) throws IOException {
if (indexShard.state() != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
}
Expand All @@ -291,17 +288,17 @@ static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long
seenBytes += orderedOp.estimateSize();
operations.add(orderedOp);
if (nextExpectedSeqNo > maxSeqNo) {
return new Response(indexMetaData.getVersion(), operations.toArray(EMPTY_OPERATIONS_ARRAY));
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
}
} else {
return new Response(indexMetaData.getVersion(), operations.toArray(EMPTY_OPERATIONS_ARRAY));
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
}
}
}
}

if (nextExpectedSeqNo >= maxSeqNo) {
return new Response(indexMetaData.getVersion(), operations.toArray(EMPTY_OPERATIONS_ARRAY));
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
} else {
String message = "Not all operations between min_seq_no [" + minSeqNo + "] and max_seq_no [" + maxSeqNo +
"] found, tracker checkpoint [" + nextExpectedSeqNo + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,21 +369,23 @@ protected void doRun() throws Exception {
"] times, aborting...", e));
}
return;
}final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations());
followerClient.execute(BulkShardOperationsAction.INSTANCE, request, new ActionListener<BulkShardOperationsResponse>() {
@Override
public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) {
handler.accept(null);
}

final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations());
followerClient.execute(BulkShardOperationsAction.INSTANCE, request, new ActionListener<BulkShardOperationsResponse>() {
@Override
public void onFailure(final Exception e) {
// No retry mechanism here, because if a failure is being redirected to this place it is considered
// non recoverable.
assert e != null;
handler.accept(e);
public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) {
handler.accept(null);
}

@Override
public void onFailure(final Exception e) {
// No retry mechanism here, because if a failure is being redirected to this place it is considered
// non recoverable.
assert e != null;
handler.accept(e);
}
}
});
);
});
}
});
Expand Down Expand Up @@ -416,8 +418,8 @@ static final class IndexMetadataVersionChecker implements BiConsumer<Long, Consu
}

public void accept(Long minimumRequiredIndexMetadataVersion, Consumer<Exception> handler) {
if (minimumRequiredIndexMetadataVersion <= currentIndexMetadataVersion.get()) {
LOGGER.info("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]",
if (currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion) {
LOGGER.debug("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]",
currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion);
handler.accept(null);
} else {
Expand All @@ -433,6 +435,7 @@ void updateMapping(Consumer<Exception> handler) {

leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
assert indexMetaData.getMappings().size() == 1;
MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value;

PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,25 @@ public void testGetOperationsBetween() throws Exception {
int min = randomIntBetween(0, numWrites - 1);
int max = randomIntBetween(min, numWrites - 1);

final ShardChangesAction.Response r =
ShardChangesAction.getOperationsBetween(indexShard, min, max, Long.MAX_VALUE, indexMetaData);
final Translog.Operation[] operations = ShardChangesAction.getOperationsBetween(indexShard, min, max, Long.MAX_VALUE);
/*
* We are not guaranteed that operations are returned to us in order they are in the translog (if our read crosses multiple
* generations) so the best we can assert is that we see the expected operations.
*/
final Set<Long> seenSeqNos = Arrays.stream(r.getOperations()).map(Translog.Operation::seqNo).collect(Collectors.toSet());
final Set<Long> seenSeqNos = Arrays.stream(operations).map(Translog.Operation::seqNo).collect(Collectors.toSet());
final Set<Long> expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toSet());
assertThat(seenSeqNos, equalTo(expectedSeqNos));
}

// get operations for a range no operations exists:
Exception e = expectThrows(IllegalStateException.class,
() -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, numWrites + 1, Long.MAX_VALUE, indexMetaData));
() -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, numWrites + 1, Long.MAX_VALUE));
assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + numWrites + "] and max_seq_no [" +
(numWrites + 1) +"] found, tracker checkpoint ["));

// get operations for a range some operations do not exist:
e = expectThrows(IllegalStateException.class,
() -> ShardChangesAction.getOperationsBetween(indexShard, numWrites - 10, numWrites + 10, Long.MAX_VALUE, indexMetaData));
() -> ShardChangesAction.getOperationsBetween(indexShard, numWrites - 10, numWrites + 10, Long.MAX_VALUE));
assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + (numWrites - 10) + "] and max_seq_no [" +
(numWrites + 10) +"] found, tracker checkpoint ["));
}
Expand All @@ -90,8 +89,7 @@ public void testGetOperationsBetweenWhenShardNotStarted() throws Exception {

ShardRouting shardRouting = TestShardRouting.newShardRouting("index", 0, "_node_id", true, ShardRoutingState.INITIALIZING);
Mockito.when(indexShard.routingEntry()).thenReturn(shardRouting);
expectThrows(IndexShardNotStartedException.class,
() -> ShardChangesAction.getOperationsBetween(indexShard, 0, 1, Long.MAX_VALUE, indexMetaData));
expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperationsBetween(indexShard, 0, 1, Long.MAX_VALUE));
}

public void testGetOperationsBetweenExceedByteLimit() throws Exception {
Expand All @@ -107,21 +105,20 @@ public void testGetOperationsBetweenExceedByteLimit() throws Exception {
}

final IndexShard indexShard = indexService.getShard(0);
final ShardChangesAction.Response r =
ShardChangesAction.getOperationsBetween(indexShard, 0, numWrites - 1, 256, indexService.getMetaData());
assertThat(r.getOperations().length, equalTo(12));
assertThat(r.getOperations()[0].seqNo(), equalTo(0L));
assertThat(r.getOperations()[1].seqNo(), equalTo(1L));
assertThat(r.getOperations()[2].seqNo(), equalTo(2L));
assertThat(r.getOperations()[3].seqNo(), equalTo(3L));
assertThat(r.getOperations()[4].seqNo(), equalTo(4L));
assertThat(r.getOperations()[5].seqNo(), equalTo(5L));
assertThat(r.getOperations()[6].seqNo(), equalTo(6L));
assertThat(r.getOperations()[7].seqNo(), equalTo(7L));
assertThat(r.getOperations()[8].seqNo(), equalTo(8L));
assertThat(r.getOperations()[9].seqNo(), equalTo(9L));
assertThat(r.getOperations()[10].seqNo(), equalTo(10L));
assertThat(r.getOperations()[11].seqNo(), equalTo(11L));
final Translog.Operation[] operations = ShardChangesAction.getOperationsBetween(indexShard, 0, numWrites - 1, 256);
assertThat(operations.length, equalTo(12));
assertThat(operations[0].seqNo(), equalTo(0L));
assertThat(operations[1].seqNo(), equalTo(1L));
assertThat(operations[2].seqNo(), equalTo(2L));
assertThat(operations[3].seqNo(), equalTo(3L));
assertThat(operations[4].seqNo(), equalTo(4L));
assertThat(operations[5].seqNo(), equalTo(5L));
assertThat(operations[6].seqNo(), equalTo(6L));
assertThat(operations[7].seqNo(), equalTo(7L));
assertThat(operations[8].seqNo(), equalTo(8L));
assertThat(operations[9].seqNo(), equalTo(9L));
assertThat(operations[10].seqNo(), equalTo(10L));
assertThat(operations[11].seqNo(), equalTo(11L));
}

}

0 comments on commit 5ea5c4d

Please sign in to comment.