Skip to content

Commit

Permalink
[Remote Translog] Introduce remote translog transfer support (#4480) (#…
Browse files Browse the repository at this point in the history
…5773)

* Introduce remote translog transfer support

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
  • Loading branch information
sachinpkale authored Jan 10, 2023
1 parent 05e4365 commit f9ea95e
Show file tree
Hide file tree
Showing 18 changed files with 1,209 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283))
- Pre conditions check before updating weighted routing metadata ([#4955](https://github.com/opensearch-project/OpenSearch/pull/4955))
- Support remote translog transfer for request level durability([#4480](https://github.com/opensearch-project/OpenSearch/pull/4480))

### Deprecated
- Refactor fuzziness interface on query builders ([#5433](https://github.com/opensearch-project/OpenSearch/pull/5433))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.common.io.stream.FilterStreamInput;
import org.opensearch.common.io.stream.StreamInput;

import java.io.EOFException;
import java.io.IOException;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
Expand Down Expand Up @@ -117,7 +118,11 @@ public void reset() throws IOException {

@Override
public int read() throws IOException {
return readByte() & 0xFF;
try {
return readByte() & 0xFF;
} catch (EOFException e) {
return -1;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
*
* @opensearch.internal
*/
final class Checkpoint {
final public class Checkpoint {

final long offset;
final int numOps;
Expand Down Expand Up @@ -262,6 +262,14 @@ public synchronized byte[] toByteArray() {
return byteOutputStream.toByteArray();
}

public long getMinTranslogGeneration() {
return minTranslogGeneration;
}

public long getGeneration() {
return generation;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public int totalOperations() {
}

@Override
final Checkpoint getCheckpoint() {
final public Checkpoint getCheckpoint() {
return checkpoint;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog.transfer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;

/**
* Service that handles remote transfer of translog and checkpoint files
*
* @opensearch.internal
*/
public class BlobStoreTransferService implements TransferService {

private final BlobStore blobStore;
private final ExecutorService executorService;

private static final Logger logger = LogManager.getLogger(BlobStoreTransferService.class);

public BlobStoreTransferService(BlobStore blobStore, ExecutorService executorService) {
this.blobStore = blobStore;
this.executorService = executorService;
}

@Override
public void uploadBlobAsync(
final TransferFileSnapshot fileSnapshot,
Iterable<String> remoteTransferPath,
ActionListener<TransferFileSnapshot> listener
) {
assert remoteTransferPath instanceof BlobPath;
BlobPath blobPath = (BlobPath) remoteTransferPath;
executorService.execute(ActionRunnable.wrap(listener, l -> {
try (InputStream inputStream = fileSnapshot.inputStream()) {
blobStore.blobContainer(blobPath)
.writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true);
l.onResponse(fileSnapshot);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e);
l.onFailure(new FileTransferException(fileSnapshot, e));
}
}));
}

@Override
public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remoteTransferPath) throws IOException {
assert remoteTransferPath instanceof BlobPath;
BlobPath blobPath = (BlobPath) remoteTransferPath;
try (InputStream inputStream = fileSnapshot.inputStream()) {
blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true);
} catch (Exception ex) {
throw ex;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog.transfer;

import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.BytesStreamInput;
import org.opensearch.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.translog.BufferedChecksumStreamInput;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Objects;

/**
* Snapshot of a single file that gets transferred
*
* @opensearch.internal
*/
public class FileSnapshot implements Closeable {

private final String name;
@Nullable
private final FileChannel fileChannel;
@Nullable
private Path path;
@Nullable
private byte[] content;

private FileSnapshot(Path path) throws IOException {
Objects.requireNonNull(path);
this.name = path.getFileName().toString();
this.path = path;
this.fileChannel = FileChannel.open(path, StandardOpenOption.READ);
}

private FileSnapshot(String name, byte[] content) {
Objects.requireNonNull(name);
this.name = name;
this.content = content;
this.fileChannel = null;
}

public Path getPath() {
return path;
}

public String getName() {
return name;
}

public long getContentLength() throws IOException {
return fileChannel == null ? content.length : fileChannel.size();
}

public InputStream inputStream() throws IOException {
return fileChannel != null
? new BufferedChecksumStreamInput(
new InputStreamStreamInput(Channels.newInputStream(fileChannel), fileChannel.size()),
path.toString()
)
: new BufferedChecksumStreamInput(new BytesStreamInput(content), name);
}

@Override
public int hashCode() {
return Objects.hash(name, content, path);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FileSnapshot other = (FileSnapshot) o;
return Objects.equals(this.name, other.name)
&& Objects.equals(this.content, other.content)
&& Objects.equals(this.path, other.path);
}

@Override
public String toString() {
return new StringBuilder("FileInfo [").append(" name = ")
.append(name)
.append(", path = ")
.append(path.toUri())
.append("]")
.toString();
}

@Override
public void close() throws IOException {
IOUtils.close(fileChannel);
}

/**
* Snapshot of a single file with primary term that gets transferred
*
* @opensearch.internal
*/
public static class TransferFileSnapshot extends FileSnapshot {

private final long primaryTerm;

public TransferFileSnapshot(Path path, long primaryTerm) throws IOException {
super(path);
this.primaryTerm = primaryTerm;
}

public TransferFileSnapshot(String name, byte[] content, long primaryTerm) throws IOException {
super(name, content);
this.primaryTerm = primaryTerm;
}

public long getPrimaryTerm() {
return primaryTerm;
}

@Override
public int hashCode() {
return Objects.hash(primaryTerm, super.hashCode());
}

@Override
public boolean equals(Object o) {
if (super.equals(o)) {
if (this == o) return true;
if (getClass() != o.getClass()) return false;
TransferFileSnapshot other = (TransferFileSnapshot) o;
return Objects.equals(this.primaryTerm, other.primaryTerm);
}
return false;
}
}

/**
* Snapshot of a single .tlg file that gets transferred
*
* @opensearch.internal
*/
public static final class TranslogFileSnapshot extends TransferFileSnapshot {

private final long generation;

public TranslogFileSnapshot(long primaryTerm, long generation, Path path) throws IOException {
super(path, primaryTerm);
this.generation = generation;
}

public long getGeneration() {
return generation;
}

@Override
public int hashCode() {
return Objects.hash(generation, super.hashCode());
}

@Override
public boolean equals(Object o) {
if (super.equals(o)) {
if (this == o) return true;
if (getClass() != o.getClass()) return false;
TranslogFileSnapshot other = (TranslogFileSnapshot) o;
return Objects.equals(this.generation, other.generation);
}
return false;
}
}

/**
* Snapshot of a single .ckp file that gets transferred
*
* @opensearch.internal
*/
public static final class CheckpointFileSnapshot extends TransferFileSnapshot {

private final long generation;

private final long minTranslogGeneration;

public CheckpointFileSnapshot(long primaryTerm, long generation, long minTranslogGeneration, Path path) throws IOException {
super(path, primaryTerm);
this.minTranslogGeneration = minTranslogGeneration;
this.generation = generation;
}

public long getGeneration() {
return generation;
}

public long getMinTranslogGeneration() {
return minTranslogGeneration;
}

@Override
public int hashCode() {
return Objects.hash(generation, minTranslogGeneration, super.hashCode());
}

@Override
public boolean equals(Object o) {
if (super.equals(o)) {
if (this == o) return true;
if (getClass() != o.getClass()) return false;
CheckpointFileSnapshot other = (CheckpointFileSnapshot) o;
return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration)
&& Objects.equals(this.generation, other.generation);
}
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog.transfer;

import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;

/**
* Exception when a single file transfer encounters a failure
*
* @opensearch.internal
*/
public class FileTransferException extends RuntimeException {

private final TransferFileSnapshot fileSnapshot;

public FileTransferException(TransferFileSnapshot fileSnapshot, Throwable cause) {
super(cause);
this.fileSnapshot = fileSnapshot;
}

public TransferFileSnapshot getFileSnapshot() {
return fileSnapshot;
}
}
Loading

0 comments on commit f9ea95e

Please sign in to comment.