Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.ozone.container.keyvalue.helpers;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
Expand All @@ -36,17 +37,20 @@
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
import org.apache.hadoop.util.Time;
import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.security.NoSuchAlgorithmException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
Expand All @@ -56,6 +60,8 @@
*/
public final class ChunkUtils {

private static final Set<Path> LOCKS = ConcurrentHashMap.newKeySet();

/** Never constructed. **/
private ChunkUtils() {

Expand All @@ -67,9 +73,8 @@ private ChunkUtils() {
* @param chunkFile - File to write data to.
* @param chunkInfo - Data stream to write.
* @param data - The data buffer.
* @param volumeIOStats
* @param volumeIOStats statistics collector
* @param sync whether to do fsync or not
* @throws StorageContainerException
*/
public static void writeData(File chunkFile, ChunkInfo chunkInfo,
ByteBuffer data, VolumeIOStats volumeIOStats, boolean sync)
Expand All @@ -85,58 +90,43 @@ public static void writeData(File chunkFile, ChunkInfo chunkInfo,
throw new StorageContainerException(err, INVALID_WRITE_SIZE);
}

FileChannel file = null;
FileLock lock = null;
Path path = chunkFile.toPath();
long startTime = Time.monotonicNow();
processFileExclusively(path, () -> {
FileChannel file = null;
try {
// skip SYNC and DSYNC to reduce contention on file.lock
file = FileChannel.open(path,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.SPARSE);

int size;
try (FileLock ignored = file.lock()) {
size = file.write(data, chunkInfo.getOffset());
}

try {
long writeTimeStart = Time.monotonicNow();

// skip SYNC and DSYNC to reduce contention on file.lock
file = FileChannel.open(chunkFile.toPath(),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.SPARSE);

lock = file.lock();
int size = file.write(data, chunkInfo.getOffset());
// Increment volumeIO stats here.
volumeIOStats.incWriteTime(Time.monotonicNow() - writeTimeStart);
volumeIOStats.incWriteOpCount();
volumeIOStats.incWriteBytes(size);
if (size != bufferSize) {
log.error("Invalid write size found. Size:{} Expected: {} ", size,
bufferSize);
throw new StorageContainerException("Invalid write size found. " +
"Size: " + size + " Expected: " + bufferSize, INVALID_WRITE_SIZE);
// Increment volumeIO stats here.
volumeIOStats.incWriteTime(Time.monotonicNow() - startTime);
volumeIOStats.incWriteOpCount();
volumeIOStats.incWriteBytes(size);
if (size != bufferSize) {
log.error("Invalid write size found. Size:{} Expected: {} ", size,
bufferSize);
throw new StorageContainerException("Invalid write size found. " +
"Size: " + size + " Expected: " + bufferSize, INVALID_WRITE_SIZE);
}
} catch (StorageContainerException ex) {
throw ex;
} catch (IOException e) {
throw new StorageContainerException(e, IO_EXCEPTION);
} finally {
closeFile(file, sync);
}
} catch (StorageContainerException ex) {
throw ex;
} catch(IOException e) {
throw new StorageContainerException(e, IO_EXCEPTION);

} finally {
if (lock != null) {
try {
lock.release();
} catch (IOException e) {
log.error("Unable to release lock ??, Fatal Error.");
throw new StorageContainerException(e, CONTAINER_INTERNAL_ERROR);
return null;
});

}
}
if (file != null) {
try {
if (sync) {
// ensure data and metadata is persisted. Outside the lock
file.force(true);
}
file.close();
} catch (IOException e) {
throw new StorageContainerException("Error closing chunk file",
e, CONTAINER_INTERNAL_ERROR);
}
}
}
log.debug("Write Chunk completed for chunkFile: {}, size {}", chunkFile,
bufferSize);
}
Expand All @@ -146,11 +136,8 @@ public static void writeData(File chunkFile, ChunkInfo chunkInfo,
*
* @param chunkFile - file where data lives.
* @param data - chunk definition.
* @param volumeIOStats
* @param volumeIOStats statistics collector
* @return ByteBuffer
* @throws StorageContainerException
* @throws ExecutionException
* @throws InterruptedException
*/
public static ByteBuffer readData(File chunkFile, ChunkInfo data,
VolumeIOStats volumeIOStats) throws StorageContainerException,
Expand All @@ -165,38 +152,36 @@ public static ByteBuffer readData(File chunkFile, ChunkInfo data,
data.toString(), UNABLE_TO_FIND_CHUNK);
}

AsynchronousFileChannel file = null;
FileLock lock = null;
try {
long readStartTime = Time.monotonicNow();
file =
AsynchronousFileChannel.open(chunkFile.toPath(),
StandardOpenOption.READ);
lock = file.lock(data.getOffset(), data.getLen(), true).get();

ByteBuffer buf = ByteBuffer.allocate((int) data.getLen());
file.read(buf, data.getOffset()).get();

// Increment volumeIO stats here.
volumeIOStats.incReadTime(Time.monotonicNow() - readStartTime);
volumeIOStats.incReadOpCount();
volumeIOStats.incReadBytes(data.getLen());

return buf;
} catch (IOException e) {
throw new StorageContainerException(e, IO_EXCEPTION);
} finally {
if (lock != null) {
try {
lock.release();
} catch (IOException e) {
log.error("I/O error is lock release.");
long offset = data.getOffset();
long len = data.getLen();
ByteBuffer buf = ByteBuffer.allocate((int) len);

Path path = chunkFile.toPath();
long startTime = Time.monotonicNow();
return processFileExclusively(path, () -> {
FileChannel file = null;

try {
file = FileChannel.open(path, StandardOpenOption.READ);

try (FileLock ignored = file.lock(offset, len, true)) {
file.read(buf, offset);
}

// Increment volumeIO stats here.
volumeIOStats.incReadTime(Time.monotonicNow() - startTime);
volumeIOStats.incReadOpCount();
volumeIOStats.incReadBytes(len);

return buf;
} catch (IOException e) {
throw new StorageContainerException(e, IO_EXCEPTION);
} finally {
if (file != null) {
IOUtils.closeStream(file);
}
}
if (file != null) {
IOUtils.closeStream(file);
}
}
});
}

/**
Expand Down Expand Up @@ -326,4 +311,37 @@ public static ContainerCommandResponseProto getReadChunkResponse(
builder.setReadChunk(response);
return builder.build();
}

@VisibleForTesting
static <T, E extends Exception> T processFileExclusively(
Path path, CheckedSupplier<T, E> op
) throws E {
for (;;) {
if (LOCKS.add(path)) {
break;
}
}

try {
return op.get();
} finally {
LOCKS.remove(path);
}
}

private static void closeFile(FileChannel file, boolean sync)
throws StorageContainerException {
if (file != null) {
try {
if (sync) {
// ensure data and metadata is persisted
file.force(true);
}
file.close();
} catch (IOException e) {
throw new StorageContainerException("Error closing chunk file",
e, CONTAINER_INTERNAL_ERROR);
}
}
}
}
Loading