Skip to content

Commit

Permalink
Merge branch 'master' into ccr
Browse files Browse the repository at this point in the history
* master:
  Removing erroneous repeat
  Adapt bwc versions after backporting #30983 to 6.4
  [Tests] Muting RatedRequestsTests#testXContentParsingIsNotLenient
  TEST:  Retry synced-flush if ongoing ops on primary (#30978)
  Fix docs build.
  Only auto-update license signature if all nodes ready (#30859)
  Add BlobContainer.writeBlobAtomic() (#30902)
  Add a doc value format to binary fields. (#30860)
  • Loading branch information
dnhatn committed Jun 5, 2018
2 parents 530089f + 14c4088 commit 2b5c0d7
Show file tree
Hide file tree
Showing 40 changed files with 406 additions and 216 deletions.
2 changes: 0 additions & 2 deletions buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,6 @@
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]settings[/\\]ClusterSettingsIT.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]shards[/\\]ClusterSearchShardsIT.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]structure[/\\]RoutingIteratorTests.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]blobstore[/\\]FsBlobStoreContainerTests.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]blobstore[/\\]FsBlobStoreTests.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]breaker[/\\]MemoryCircuitBreakerTests.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]geo[/\\]ShapeBuilderTests.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]hash[/\\]MessageDigestsTests.java" checks="LineLength" />
Expand Down
1 change: 0 additions & 1 deletion docs/reference/query-dsl/query-string-syntax.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ would look like this:
}
}

****

===== Grouping

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public void testXContentRoundtrip() throws IOException {
}
}

@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/31104")
public void testXContentParsingIsNotLenient() throws IOException {
RatedRequest testItem = createTestItem(randomBoolean());
XContentType xContentType = randomFrom(XContentType.values());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public RoutingExplanations getExplanations() {

@Override
public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
super.readFrom(in);
state = ClusterState.readFrom(in, null);
explanations = RoutingExplanations.readFrom(in);
Expand All @@ -76,7 +76,7 @@ public void readFrom(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
super.writeTo(out);
state.writeTo(out);
RoutingExplanations.writeTo(explanations, out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class ClusterUpdateSettingsResponse extends AcknowledgedResponse {

@Override
public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
super.readFrom(in);
transientSettings = Settings.readSettingsFromStream(in);
persistentSettings = Settings.readSettingsFromStream(in);
Expand All @@ -89,7 +89,7 @@ public Settings getPersistentSettings() {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
super.writeTo(out);
Settings.writeSettingsToStream(transientSettings, out);
Settings.writeSettingsToStream(persistentSettings, out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public boolean isRolledOver() {

@Override
public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
super.readFrom(in);
oldIndex = in.readString();
newIndex = in.readString();
Expand Down Expand Up @@ -144,7 +144,7 @@ public void readFrom(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
super.writeTo(out);
out.writeString(oldIndex);
out.writeString(newIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,29 @@ public interface BlobContainer {
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException;

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
* using an atomic write operation if the implementation supports it. When the BlobContainer implementation
* does not provide a specific implementation of writeBlobAtomic(String, InputStream, long), then
* the {@link #writeBlob(String, InputStream, long)} method is used.
*
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
*
* @param blobName
* The name of the blob to write the contents of the input stream to.
* @param inputStream
* The input stream from which to retrieve the bytes to write to the blob.
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @throws FileAlreadyExistsException if a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
default void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
writeBlob(blobName, inputStream, blobSize);
}

/**
* Deletes a blob with giving name, if the blob exists. If the blob does not exist,
* this method throws a NoSuchFileException.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

package org.elasticsearch.common.blobstore.fs;

import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.io.Streams;

import java.io.BufferedInputStream;
Expand Down Expand Up @@ -56,8 +57,9 @@
*/
public class FsBlobContainer extends AbstractBlobContainer {

protected final FsBlobStore blobStore;
private static final String TEMP_FILE_PREFIX = "pending-";

protected final FsBlobStore blobStore;
protected final Path path;

public FsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path) {
Expand Down Expand Up @@ -131,6 +133,48 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t
IOUtils.fsync(path, true);
}

@Override
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
final String tempBlob = tempBlobName(blobName);
final Path tempBlobPath = path.resolve(tempBlob);
try {
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
Streams.copy(inputStream, outputStream);
}
IOUtils.fsync(tempBlobPath, false);

final Path blobPath = path.resolve(blobName);
// If the target file exists then Files.move() behaviour is implementation specific
// the existing file might be replaced or this method fails by throwing an IOException.
if (Files.exists(blobPath)) {
throw new FileAlreadyExistsException("blob [" + blobPath + "] already exists, cannot overwrite");
}
Files.move(tempBlobPath, blobPath, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException ex) {
try {
deleteBlobIgnoringIfNotExists(tempBlob);
} catch (IOException e) {
ex.addSuppressed(e);
}
throw ex;
} finally {
IOUtils.fsync(path, true);
}
}

public static String tempBlobName(final String blobName) {
return "pending-" + blobName + "-" + UUIDs.randomBase64UUID();
}

/**
* Returns true if the blob is a leftover temporary blob.
*
* The temporary blobs might be left after failed atomic write operation.
*/
public static boolean isTempBlobName(final String blobName) {
return blobName.startsWith(TEMP_FILE_PREFIX);
}

@Override
public void move(String source, String target) throws IOException {
Path sourcePath = path.resolve(source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.elasticsearch.index.fielddata.plain.BytesBinaryDVIndexFieldData;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.search.DocValueFormat;
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.util.Base64;
Expand Down Expand Up @@ -104,6 +106,10 @@ public String typeName() {
return CONTENT_TYPE;
}

@Override
public DocValueFormat docValueFormat(String format, DateTimeZone timeZone) {
return DocValueFormat.BINARY;
}

@Override
public BytesReference valueForDisplay(Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,18 +507,7 @@ private InFlightOpsResponse performInFlightOps(InFlightOpsRequest request) {
if (indexShard.routingEntry().primary() == false) {
throw new IllegalStateException("[" + request.shardId() +"] expected a primary shard");
}
if (Assertions.ENABLED) {
if (logger.isTraceEnabled()) {
logger.trace("in flight operations {}, acquirers {}", indexShard.getActiveOperationsCount(), indexShard.getActiveOperations());
}
}
int opCount = indexShard.getActiveOperationsCount();
// Need to snapshot the debug info twice as it's updated concurrently with the permit count.
if (Assertions.ENABLED) {
if (logger.isTraceEnabled()) {
logger.trace("in flight operations {}, acquirers {}", indexShard.getActiveOperationsCount(), indexShard.getActiveOperations());
}
}
return new InFlightOpsResponse(opCount);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -555,10 +556,8 @@ public String startVerification() {
String blobName = "master.dat";
BytesArray bytes = new BytesArray(testBytes);
try (InputStream stream = bytes.streamInput()) {
testContainer.writeBlob(blobName + "-temp", stream, bytes.length());
testContainer.writeBlobAtomic(blobName, stream, bytes.length());
}
// Make sure that move is supported
testContainer.move(blobName + "-temp", blobName);
return seed;
}
} catch (IOException exp) {
Expand Down Expand Up @@ -774,18 +773,8 @@ private long listBlobsToGetLatestIndexId() throws IOException {
}

private void writeAtomic(final String blobName, final BytesReference bytesRef) throws IOException {
final String tempBlobName = "pending-" + blobName + "-" + UUIDs.randomBase64UUID();
try (InputStream stream = bytesRef.streamInput()) {
snapshotsBlobContainer.writeBlob(tempBlobName, stream, bytesRef.length());
snapshotsBlobContainer.move(tempBlobName, blobName);
} catch (IOException ex) {
// temporary blob creation or move failed - try cleaning up
try {
snapshotsBlobContainer.deleteBlobIgnoringIfNotExists(tempBlobName);
} catch (IOException e) {
ex.addSuppressed(e);
}
throw ex;
snapshotsBlobContainer.writeBlobAtomic(blobName, stream, bytesRef.length());
}
}

Expand Down Expand Up @@ -955,7 +944,7 @@ protected void finalize(final List<SnapshotFiles> snapshots,
// Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier
// attempt to write an index file with this generation failed mid-way after creating the temporary file.
for (final String blobName : blobs.keySet()) {
if (indexShardSnapshotsFormat.isTempBlobName(blobName)) {
if (FsBlobContainer.isTempBlobName(blobName)) {
try {
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.bytes.BytesArray;
Expand Down Expand Up @@ -52,8 +53,6 @@
*/
public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreFormat<T> {

private static final String TEMP_FILE_PREFIX = "pending-";

private static final XContentType DEFAULT_X_CONTENT_TYPE = XContentType.SMILE;

// The format version
Expand Down Expand Up @@ -120,7 +119,7 @@ public T readBlob(BlobContainer blobContainer, String blobName) throws IOExcepti
}

/**
* Writes blob in atomic manner with resolving the blob name using {@link #blobName} and {@link #tempBlobName} methods.
* Writes blob in atomic manner with resolving the blob name using {@link #blobName} method.
* <p>
* The blob will be compressed and checksum will be written if required.
*
Expand All @@ -131,20 +130,12 @@ public T readBlob(BlobContainer blobContainer, String blobName) throws IOExcepti
* @param name blob name
*/
public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws IOException {
String blobName = blobName(name);
String tempBlobName = tempBlobName(name);
writeBlob(obj, blobContainer, tempBlobName);
try {
blobContainer.move(tempBlobName, blobName);
} catch (IOException ex) {
// Move failed - try cleaning up
try {
blobContainer.deleteBlob(tempBlobName);
} catch (Exception e) {
ex.addSuppressed(e);
final String blobName = blobName(name);
writeTo(obj, blobName, bytesArray -> {
try (InputStream stream = bytesArray.streamInput()) {
blobContainer.writeBlobAtomic(blobName, stream, bytesArray.length());
}
throw ex;
}
});
}

/**
Expand All @@ -157,51 +148,35 @@ public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws
* @param name blob name
*/
public void write(T obj, BlobContainer blobContainer, String name) throws IOException {
String blobName = blobName(name);
writeBlob(obj, blobContainer, blobName);
final String blobName = blobName(name);
writeTo(obj, blobName, bytesArray -> {
try (InputStream stream = bytesArray.streamInput()) {
blobContainer.writeBlob(blobName, stream, bytesArray.length());
}
});
}

/**
* Writes blob in atomic manner without resolving the blobName using using {@link #blobName} method.
* <p>
* The blob will be compressed and checksum will be written if required.
*
* @param obj object to be serialized
* @param blobContainer blob container
* @param blobName blob name
*/
protected void writeBlob(T obj, BlobContainer blobContainer, String blobName) throws IOException {
BytesReference bytes = write(obj);
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
private void writeTo(final T obj, final String blobName, final CheckedConsumer<BytesArray, IOException> consumer) throws IOException {
final BytesReference bytes = write(obj);
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")";
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, byteArrayOutputStream, BUFFER_SIZE)) {
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, outputStream, BUFFER_SIZE)) {
CodecUtil.writeHeader(indexOutput, codec, VERSION);
try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
@Override
public void close() throws IOException {
// this is important since some of the XContentBuilders write bytes on close.
// in order to write the footer we need to prevent closing the actual index input.
} }) {
}
}) {
bytes.writeTo(indexOutputOutputStream);
}
CodecUtil.writeFooter(indexOutput);
}
BytesArray bytesArray = new BytesArray(byteArrayOutputStream.toByteArray());
try (InputStream stream = bytesArray.streamInput()) {
blobContainer.writeBlob(blobName, stream, bytesArray.length());
}
consumer.accept(new BytesArray(outputStream.toByteArray()));
}
}

/**
* Returns true if the blob is a leftover temporary blob.
*
* The temporary blobs might be left after failed atomic write operation.
*/
public boolean isTempBlobName(String blobName) {
return blobName.startsWith(ChecksumBlobStoreFormat.TEMP_FILE_PREFIX);
}

protected BytesReference write(T obj) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
if (compress) {
Expand All @@ -222,10 +197,4 @@ protected void write(T obj, StreamOutput streamOutput) throws IOException {
builder.endObject();
}
}


protected String tempBlobName(String name) {
return TEMP_FILE_PREFIX + String.format(Locale.ROOT, blobNameFormat, name);
}

}
Loading

0 comments on commit 2b5c0d7

Please sign in to comment.