Skip to content

Commit

Permalink
[Issue #72]: fix bugs in pixels-cache and implement loading radix fro…
Browse files Browse the repository at this point in the history
…m index file. (#77)

Fixed bugs:
1. RadixNode.addChild() does not maintain children size corrently;
2. RadixNode.getLengthInBytes() overrates nodesize;
3. allocatedIndexOffset in PixelsCacheWriter is not correctly maintained.

Loading radix from index file is implemented in PixelsCacheUtil.loadRadixIndex().
  • Loading branch information
bianhq authored Sep 10, 2020
1 parent 934b174 commit f8ad798
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

/**
* pixels cache reader.
* It is not thread safe.
*
* @author guodong
* @author hank
Expand All @@ -43,20 +44,20 @@ public class PixelsCacheReader

/**
* <p>
* A node can have no more than 256 children, plus an edge (a segment of
* A node can have at most 256 children, plus an edge (a segment of
* lookup key). Each child is 8 bytes.
* </p>
* <p>
* In pixels, where will be one PixelsCacheReader on each split. And each
* split is supposed to be processed by one single thread. There are
* typically 10 - 200 concurrent threads (splits) on a machine. So that it
* is not a problem to allocate nodeEdge in each PixelsCacheReader instance.
* is not a problem to allocate nodeData in each PixelsCacheReader instance.
* By this, we can avoid frequent memory allocation in the search() method.
* </p>
* <p>
* Although PixelsRadix can support 1MB edge length, but in Pixels, we only
* For the edge, although PixelsRadix can support 1MB edge length, we only
* use 12 byte path (PixelsCacheKey). That means no edges can exceeds 12 bytes.
* So that 16 bytes here is more than enough.
* So that 16 bytes is more than enough.
* </p>
*/
private byte[] nodeData = new byte[256 * 8 + 16];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@
*/
package io.pixelsdb.pixels.cache;

import io.pixelsdb.pixels.common.exception.CacheException;
import io.pixelsdb.pixels.common.utils.Constants;
import org.apache.directory.api.util.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;

/**
* pixels cache header
* index:
* - HEADER: MAGIC(6 bytes), RW(2 bytes), VERSION(4 bytes), READER_COUNT(4 bytes)
* - HEADER: MAGIC(6 bytes), RW(2 bytes), READER_COUNT(2 bytes), VERSION(4 bytes)
* - RADIX
* cache:
* - HEADER: MAGIC(6 bytes), STATUS(2 bytes), SIZE(8 bytes)
Expand All @@ -39,6 +43,8 @@
*/
public class PixelsCacheUtil
{
private final static Logger logger = LogManager.getLogger(PixelsCacheUtil.class);

public static final int MAX_READER_COUNT = 2 ^ 15 - 1;

public static final int RW_MASK;
Expand Down Expand Up @@ -215,14 +221,115 @@ public static int getIndexVersion(MemoryMappedFile indexFile)
return indexFile.getIntVolatile(10);
}

public static PixelsRadix getIndexRadix(MemoryMappedFile indexFile)
/**
* Read radix from index file.
* @param indexFile the index file to be read.
* @return the radix tree read from index file.
*/
public static PixelsRadix loadRadixIndex(MemoryMappedFile indexFile) throws CacheException
{
PixelsRadix radix = new PixelsRadix();
readRadix(indexFile, PixelsCacheUtil.INDEX_RADIX_OFFSET, radix.getRoot(), 1);
return radix;
}

/**
* Read and construct the index from index file.
* @param indexFile the index file to be read.
* @param nodeOffset the offset of the current root node of the free (or sub-tree).
* @param node the current root node to be read from index file.
* @param level the current level of the node, starts from 1 for root of the tree.
*/
private static void readRadix(MemoryMappedFile indexFile, long nodeOffset,
RadixNode node, int level) throws CacheException
{
long[] children = readNode(indexFile, nodeOffset, node, level);

if (node.isKey())
{
return;
}

if (children == null)
{
throw new CacheException("Can not read node normally.");
}

for (long childId : children)
{
// offset is in the lowest 56 bits, the highest 8 bits leader is discarded.
long childOffset = childId & 0x00FFFFFFFFFFFFFFL;
RadixNode childNode = new RadixNode();
readRadix(indexFile, childOffset, childNode, level+1);
node.addChild(childNode, true);
}
}

/**
* Read the index node from index file.
* @param indexFile the index file to be read.
* @param nodeOffset the offset of this node in index file.
* @param node the node to be read from index file.
* @param level the current level of this node.
* @return the children ids (1 byte leader + 7 bytes offset) of the node.
*/
private static long[] readNode(MemoryMappedFile indexFile, long nodeOffset,
RadixNode node, int level)
{
// TODO: read radix from index file.
return new PixelsRadix();
if (nodeOffset >= indexFile.getSize())
{
logger.debug("Offset exceeds index size. Break. Current size: " + nodeOffset);
return null;
}
int dramAccessCounter = 0;
node.offset = nodeOffset;
int nodeHeader = indexFile.getInt(nodeOffset);
dramAccessCounter++;
int nodeChildrenNum = nodeHeader & 0x000001FF;
int nodeEdgeSize = (nodeHeader & 0x7FFFFE00) >>> 9;

byte[] childrenData = new byte[nodeChildrenNum * 8];
indexFile.getBytes(nodeOffset + 4, childrenData, 0, nodeChildrenNum * 8);
/**
* To ensure the consistent endian (big-endian) in Java,
* we use ByteBuffer to wrap the bytes instead of directly getLong() from indexFile.
*/
ByteBuffer childrenBuffer = ByteBuffer.wrap(childrenData);
long[] children = new long[nodeChildrenNum];
for (int i = 0; i < nodeChildrenNum; ++i)
{
children[i] = childrenBuffer.getLong();
}
dramAccessCounter++;
byte[] edge = new byte[nodeEdgeSize];
indexFile.getBytes(nodeOffset + 4 + nodeChildrenNum * 8, edge, 0, nodeEdgeSize);
dramAccessCounter++;
node.setEdge(edge);

if (((nodeHeader >>> 31) & 1) > 0)
{
node.setKey(true);
// read value
byte[] idx = new byte[12];
indexFile.getBytes(nodeOffset + 4 + (nodeChildrenNum * 8) + nodeEdgeSize,
idx, 0, 12);
dramAccessCounter++;
PixelsCacheIdx cacheIdx = new PixelsCacheIdx(idx);
cacheIdx.dramAccessCount = dramAccessCounter;
cacheIdx.radixLevel = level;
node.setValue(cacheIdx);
}
else
{
node.setKey(false);
}

return children;
}

public static void flushRadix(MemoryMappedFile indexFile, PixelsRadix radix)
{
// TODO: flush radix is currently implemented in PixelsCacheWriter, to be moved here.
}

public static void setCacheStatus(MemoryMappedFile cacheFile, short status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public PixelsCacheWriter build()
if (!builderOverwrite && PixelsCacheUtil.checkMagic(indexFile) && PixelsCacheUtil.checkMagic(cacheFile))
{
// cache exists in local cache file and index, reload the index.
radix = PixelsCacheUtil.getIndexRadix(indexFile);
radix = PixelsCacheUtil.loadRadixIndex(indexFile);
// build cachedColumnlets for PixelsCacheWriter.
int cachedVersion = PixelsCacheUtil.getIndexVersion(indexFile);
MetadataService metadataService = new MetadataService(
Expand Down Expand Up @@ -207,6 +207,15 @@ public MemoryMappedFile getIndexFile()
return indexFile;
}

/**
* DO NOT USE THIS METHOD. Only for unit test.
* @return
*/
public PixelsRadix getRadix()
{
return this.radix;
}

/**
* <p>
* This function is only used to bulk load all the cache content at one time.
Expand Down Expand Up @@ -747,7 +756,7 @@ private boolean flushNode(RadixNode node)
nodeBuffer.clear();
if (currentIndexOffset >= indexFile.getSize())
{
logger.debug("Index file have exceeded cache size. Break. Current size: " + currentIndexOffset);
logger.debug("Offset exceeds index size. Break. Current size: " + currentIndexOffset);
return false;
}
if (node.offset == 0)
Expand Down Expand Up @@ -786,7 +795,7 @@ private boolean flushNode(RadixNode node)
byte[] nodeBytes = new byte[node.getChildren().size() * 8];
nodeBuffer.flip();
nodeBuffer.get(nodeBytes);
indexFile.putBytes(currentIndexOffset, nodeBytes);
indexFile.putBytes(currentIndexOffset, nodeBytes); // children
currentIndexOffset += nodeBytes.length;
indexFile.putBytes(currentIndexOffset, node.getEdge()); // edge
currentIndexOffset += node.getEdge().length;
Expand All @@ -806,6 +815,7 @@ private void flushIndex()
{
// set index content offset, skip the index header.
currentIndexOffset = PixelsCacheUtil.INDEX_RADIX_OFFSET;
allocatedIndexOffset = PixelsCacheUtil.INDEX_RADIX_OFFSET;
// if root contains nodes, which means the tree is not empty,then write nodes.
if (radix.getRoot().getSize() != 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ public PixelsRadix()
nodes.add(root); // add root node
}

private PixelsRadix(DynamicArray<RadixNode> nodes)
{
this.nodes = nodes;
}

public static PixelsRadix wrap(DynamicArray<RadixNode> nodes)
{
if (nodes != null && nodes.size() > 0)
{
return new PixelsRadix(nodes);
}
return null;
}

public RadixNode getRoot()
{
return nodes.get(0);
Expand Down
20 changes: 17 additions & 3 deletions pixels-cache/src/main/java/io/pixelsdb/pixels/cache/RadixNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

/**
* @author guodong
* @author hank
*/
public class RadixNode
{
Expand Down Expand Up @@ -76,8 +77,16 @@ public void addChild(RadixNode child, boolean overwrite)
{
return;
}
children.put(firstByte, child);
size++;
if (children.put(firstByte, child) == null)
{
/**
* Issue #72:
* size was increased no matter firstByte exists or not,
* so that size could become larger than exact children number.
* Fix it in this issue.
*/
size++;
}
}

public void removeChild(RadixNode child)
Expand Down Expand Up @@ -127,7 +136,12 @@ public PixelsCacheIdx getValue()
public int getLengthInBytes()
{
int len = 4 + edge.length; // header
len += 1 * children.size(); // leaders
/**
* Issue #72:
* leader is already contained in the highest 8 bits in each children (id),
* no needed to allocate memory space for leaders.
*/
// len += 1 * children.size(); // leaders
len += 8 * children.size(); // offsets
// value
if (isKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public void testSimpleCacheWriter()
FileSystem fs = FileSystem.get(URI.create(cacheConfig.getWarehousePath()), conf);
PixelsCacheWriter cacheWriter =
PixelsCacheWriter.newBuilder()
.setCacheLocation("/Users/Jelly/Desktop/pixels.cache")
.setCacheLocation("/home/hank/Desktop/pixels.cache")
.setCacheSize(1024 * 1024 * 64L)
.setIndexLocation("/Users/Jelly/Desktop/pixels.index")
.setIndexLocation("/home/hank/Desktop/pixels.index")
.setIndexSize(1024 * 1024 * 64L)
.setOverwrite(true)
.setFS(fs)
Expand All @@ -65,6 +65,11 @@ public void testSimpleCacheWriter()
}
}
cacheWriter.flush();
PixelsRadix radix = cacheWriter.getRadix();
radix.printStats();

PixelsRadix radix1 = PixelsCacheUtil.loadRadixIndex(cacheWriter.getIndexFile());
radix1.printStats();
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2018 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.exception;

/**
* Created at: 2020-09-10 @ Chavannes, Switzerland.
* Author: hank
*/
public class CacheException extends Exception
{
public CacheException(String msg)
{
super(msg);
}
}

0 comments on commit f8ad798

Please sign in to comment.