Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #52]: implement direct shared memory read. #53

Merged
merged 2 commits into from
Jan 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ public class ColumnletId
public long blockId;
public short rowGroupId;
public short columnId;
public boolean direct;
long cacheOffset;
int cacheLength;

public ColumnletId(short rowGroupId, short columnId)
public ColumnletId(short rowGroupId, short columnId, boolean direct)
{
this.rowGroupId = rowGroupId;
this.columnId = columnId;
this.direct = direct;
}

public ColumnletId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
import sun.misc.Unsafe;
import sun.nio.ch.FileChannelImpl;

import java.io.FileDescriptor;
import java.io.RandomAccessFile;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
Expand All @@ -51,6 +55,8 @@ public class MemoryMappedFile
private static final Unsafe unsafe;
private static final Method mmap;
private static final Method unmmap;
// this is from sun.nio.ch.Util
private static volatile Constructor<?> directByteBufferRConstructor = null;
private static final int BYTE_ARRAY_OFFSET;

private long addr, size;
Expand All @@ -71,6 +77,22 @@ public class MemoryMappedFile
{
throw new RuntimeException(e);
}

// this is from sun.nio.ch.Util.initDBBRConstructor
try
{
Class<?> cl = Class.forName("java.nio.DirectByteBufferR");
Constructor<?> ctor = cl.getDeclaredConstructor(
new Class<?>[] { int.class, long.class, FileDescriptor.class, Runnable.class } );
ctor.setAccessible(true);
directByteBufferRConstructor = ctor;
} catch (ClassNotFoundException |
NoSuchMethodException |
IllegalArgumentException |
ClassCastException e)
{
throw new InternalError(e);
}
}

private static Method getMethod(Class<?> cls, String name, Class<?>... params)
Expand All @@ -81,6 +103,24 @@ private static Method getMethod(Class<?> cls, String name, Class<?>... params)
return m;
}

// this is derived from sun.nio.ch.Util.newMappedByteBufferR
// create a read only direct byte buffer without memory copy.
static ByteBuffer newDirectByteBufferR(int size, long addr)
{
ByteBuffer buffer;
try
{
buffer = (ByteBuffer) directByteBufferRConstructor.newInstance(
new Object[]{ size, addr, null, null });
} catch (InstantiationException |
IllegalAccessException |
InvocationTargetException e)
{
throw new InternalError(e);
}
return buffer;
}

private static long roundTo4096(long i)
{
return (i + 0xfffL) & ~0xfffL;
Expand Down Expand Up @@ -279,7 +319,7 @@ public void putLongVolatile(long pos, long val)
}

/**
* Reads a buffer of data.
* Reads a buffer of data, with memory copy.
*
* @param pos the position in the memory mapped file
* @param data the input buffer
Expand All @@ -291,6 +331,21 @@ public void getBytes(long pos, byte[] data, int offset, int length)
unsafe.copyMemory(null, pos + addr, data, BYTE_ARRAY_OFFSET + offset, length);
}

/**
* Get a direct byte buffer of data without memory copy.
* The returned byte buffer is read only. Any read (get)
* methods will be performed on the mapped memory (this.data)
* directly, just as the read (get) methods in MemoryMappedFile.
*
* @param pos the position in the memory mapped file
* @param length the length of the data
* @return the direct byte buffer, which is read only
*/
public ByteBuffer getDirectByteBuffer(long pos, int length)
{
return newDirectByteBufferR(length, pos + addr);
}

/**
* Writes a buffer of data.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public static PixelsCacheReader.Builder newBuilder()
return new PixelsCacheReader.Builder();
}

public ByteBuffer get(long blockId, short rowGroupId, short columnId)
{
return this.get(blockId, rowGroupId, columnId, true);
}

/**
* Read specified columnlet from cache.
* If cache is not hit, empty byte array is returned, and an access message is sent to the mq.
Expand All @@ -98,9 +103,10 @@ public static PixelsCacheReader.Builder newBuilder()
* @param blockId block id
* @param rowGroupId row group id
* @param columnId column id
* @param direct get direct byte buffer if true
* @return columnlet content
*/
public ByteBuffer get(long blockId, short rowGroupId, short columnId)
public ByteBuffer get(long blockId, short rowGroupId, short columnId, boolean direct)
{
// search index file for columnlet id
PixelsCacheKeyUtil.getBytes(keyBuffer, blockId, rowGroupId, columnId);
Expand All @@ -116,9 +122,17 @@ public ByteBuffer get(long blockId, short rowGroupId, short columnId)
// long readBegin = System.nanoTime();
if (cacheIdx != null)
{
content = ByteBuffer.allocate(cacheIdx.length);
// read content
cacheFile.getBytes(cacheIdx.offset, content.array(), 0, cacheIdx.length);
if (direct)
{
// read content
content = cacheFile.getDirectByteBuffer(cacheIdx.offset, cacheIdx.length);
}
else
{
content = ByteBuffer.allocate(cacheIdx.length);
// read content
cacheFile.getBytes(cacheIdx.offset, content.array(), 0, cacheIdx.length);
}
}
// long readEnd = System.nanoTime();
// cacheLogger.addReadLatency(readEnd - readBegin);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ private boolean read()
// find cached chunks
for (int colId : targetColumns)
{
// only use direct byte buffer for string columns.
boolean direct = fileSchema.getChildren().get(colId).getCategory() ==
TypeDescription.Category.STRING;
for (int rgIdx = 0; rgIdx < targetRGNum; rgIdx++)
{
int rgId = rgIdx + RGStart;
Expand All @@ -389,7 +392,7 @@ private boolean read()
// if cached, read from cache files
if (cacheOrder.contains(cacheIdentifier))
{
ColumnletId chunkId = new ColumnletId((short) rgId, (short) colId);
ColumnletId chunkId = new ColumnletId((short) rgId, (short) colId, direct);
cacheChunks.add(chunkId);
}
// if cache miss, add chunkId to be read from disks
Expand All @@ -415,7 +418,7 @@ private boolean read()
short rgId = chunkId.rowGroupId;
short colId = chunkId.columnId;
// long getBegin = System.nanoTime();
ByteBuffer columnlet = cacheReader.get(blockId, rgId, colId);
ByteBuffer columnlet = cacheReader.get(blockId, rgId, colId, chunkId.direct);
// long getEnd = System.nanoTime();
// logger.debug("[cache get]: " + columnlet.length + "," + (getEnd - getBegin));
chunkBuffers[(rgId - RGStart) * includedColumns.length + colId] = columnlet;
Expand Down Expand Up @@ -648,7 +651,7 @@ public VectorizedRowBatch readBatch(int batchSize)
PixelsProto.ColumnChunkIndex chunkIndex = rowGroupFooter.getRowGroupIndexEntry()
.getColumnChunkIndexEntries(
resultColumns[i]);
// TODO: read chunk buffer lazily when a column block is road by PixelsPageSource.
// TODO: read chunk buffer lazily when a column block is read by PixelsPageSource.
readers[i].read(chunkBuffers[index], encoding, curRowInRG, curBatchSize,
postScript.getPixelStride(), resultRowBatch.size, columnVectors[i], chunkIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,24 @@ public void read(ByteBuffer input, PixelsProto.ColumnEncoding encoding,
ColumnVector vector, PixelsProto.ColumnChunkIndex chunkIndex)
throws IOException
{

BinaryColumnVector columnVector = (BinaryColumnVector) vector;
if (offset == 0)
{
if (inputBuffer != null)
{
inputBuffer.release();
}
inputBuffer = Unpooled.wrappedBuffer(input);
if (input.isDirect())
{
byte[] bytes = new byte[input.limit()];
input.get(bytes);
inputBuffer = Unpooled.wrappedBuffer(bytes);
}
else
{
inputBuffer = Unpooled.wrappedBuffer(input);
}
readContent(input.limit(), encoding);
isNullOffset = (int) chunkIndex.getIsNullOffset();
hasNull = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.pixelsdb.pixels.cache.MemoryMappedFile;

import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Random;

Expand Down Expand Up @@ -102,7 +103,13 @@ public void run()
long begin = System.nanoTime();
while (count < round)
{
mappedFile.getBytes(offsets[count], result, 0, acSize);
//mappedFile.getBytes(offsets[count], result, 0, acSize);
ByteBuffer byteBuffer = mappedFile.getDirectByteBuffer(offsets[count], acSize);
/*for (int i = 0; i < acSize; ++i)
{
byteBuffer.get(i);
}*/
byteBuffer.get(result);
count++;
}
long end = System.nanoTime();
Expand Down